diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index daf9b0f95273e98cdd46cce8dbb1d910f522134a..d673cb0946639e8c320da778c06a25415b87b8fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1101,7 +1101,6 @@ class DAGScheduler(
             s" ${task.stageAttemptId} and there is a more recent attempt for that stage " +
             s"(attempt ID ${failedStage.latestInfo.attemptId}) running")
         } else {
-
           // It is likely that we receive multiple FetchFailed for a single stage (because we have
           // multiple tasks running concurrently on different executors). In that case, it is
           // possible the fetch failure has already been handled by the scheduler.
@@ -1117,6 +1116,11 @@ class DAGScheduler(
           if (disallowStageRetryForTest) {
             abortStage(failedStage, "Fetch failure will not retry stage due to testing config",
               None)
+          } else if (failedStage.failedOnFetchAndShouldAbort(task.stageAttemptId)) {
+            abortStage(failedStage, s"$failedStage (${failedStage.name}) " +
+              s"has failed the maximum allowable number of " +
+              s"times: ${Stage.MAX_CONSECUTIVE_FETCH_FAILURES}. " +
+              s"Most recent failure reason: ${failureMessage}", None)
           } else if (failedStages.isEmpty) {
             // Don't schedule an event to resubmit failed stages if failed isn't empty, because
             // in that case the event will already have been scheduled.
@@ -1240,10 +1244,17 @@ class DAGScheduler(
     if (errorMessage.isEmpty) {
       logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
       stage.latestInfo.completionTime = Some(clock.getTimeMillis())
+
+      // Clear failure count for this stage, now that it's succeeded.
+      // We only limit consecutive failures of stage attempts,so that if a stage is
+      // re-used many times in a long-running job, unrelated failures don't eventually cause the
+      // stage to be aborted.
+      stage.clearFailures()
     } else {
       stage.latestInfo.stageFailed(errorMessage.get)
       logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
     }
+
     outputCommitCoordinator.stageEnd(stage.id)
     listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
     runningStages -= stage
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 1cf06856ffbc2a38766879d86caadd95995744a4..c086535782c23fdd29bbbde3799f9064cd173717 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -46,7 +46,7 @@ import org.apache.spark.util.CallSite
  * be updated for each attempt.
  *
  */
-private[spark] abstract class Stage(
+private[scheduler] abstract class Stage(
     val id: Int,
     val rdd: RDD[_],
     val numTasks: Int,
@@ -92,6 +92,29 @@ private[spark] abstract class Stage(
    */
   private var _latestInfo: StageInfo = StageInfo.fromStage(this, nextAttemptId)
 
+  /**
+   * Set of stage attempt IDs that have failed with a FetchFailure. We keep track of these
+   * failures in order to avoid endless retries if a stage keeps failing with a FetchFailure.
+   * We keep track of each attempt ID that has failed to avoid recording duplicate failures if
+   * multiple tasks from the same stage attempt fail (SPARK-5945).
+   */
+  private val fetchFailedAttemptIds = new HashSet[Int]
+
+  private[scheduler] def clearFailures() : Unit = {
+    fetchFailedAttemptIds.clear()
+  }
+
+  /**
+   * Check whether we should abort the failedStage due to multiple consecutive fetch failures.
+   *
+   * This method updates the running set of failed stage attempts and returns
+   * true if the number of failures exceeds the allowable number of failures.
+   */
+  private[scheduler] def failedOnFetchAndShouldAbort(stageAttemptId: Int): Boolean = {
+    fetchFailedAttemptIds.add(stageAttemptId)
+    fetchFailedAttemptIds.size >= Stage.MAX_CONSECUTIVE_FETCH_FAILURES
+  }
+
   /** Creates a new attempt for this stage by creating a new StageInfo with a new attempt ID. */
   def makeNewStageAttempt(
       numPartitionsToCompute: Int,
@@ -110,3 +133,8 @@ private[spark] abstract class Stage(
     case _ => false
   }
 }
+
+private[scheduler] object Stage {
+  // The number of consecutive failures allowed before a stage is aborted
+  val MAX_CONSECUTIVE_FETCH_FAILURES = 4
+}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 2e8688cf41d9975afcff6dacb44d8be16db3451d..62957c66974fb9d3e36f22c87064530ae7081f28 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -26,11 +26,11 @@ import org.scalatest.concurrent.Timeouts
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
 import org.apache.spark.util.CallSite
-import org.apache.spark.executor.TaskMetrics
 
 class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
   extends DAGSchedulerEventProcessLoop(dagScheduler) {
@@ -473,6 +473,282 @@ class DAGSchedulerSuite
     assertDataStructuresEmpty()
   }
 
+
+  // Helper function to validate state when creating tests for task failures
+  private def checkStageId(stageId: Int, attempt: Int, stageAttempt: TaskSet) {
+    assert(stageAttempt.stageId === stageId)
+    assert(stageAttempt.stageAttemptId == attempt)
+  }
+
+
+  // Helper functions to extract commonly used code in Fetch Failure test cases
+  private def setupStageAbortTest(sc: SparkContext) {
+    sc.listenerBus.addListener(new EndListener())
+    ended = false
+    jobResult = null
+  }
+
+  // Create a new Listener to confirm that the listenerBus sees the JobEnd message
+  // when we abort the stage. This message will also be consumed by the EventLoggingListener
+  // so this will propagate up to the user.
+  var ended = false
+  var jobResult : JobResult = null
+
+  class EndListener extends SparkListener {
+    override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+      jobResult = jobEnd.jobResult
+      ended = true
+    }
+  }
+
+  /**
+   * Common code to get the next stage attempt, confirm it's the one we expect, and complete it
+   * successfully.
+   *
+   * @param stageId - The current stageId
+   * @param attemptIdx - The current attempt count
+   * @param numShufflePartitions - The number of partitions in the next stage
+   */
+  private def completeShuffleMapStageSuccessfully(
+      stageId: Int,
+      attemptIdx: Int,
+      numShufflePartitions: Int): Unit = {
+    val stageAttempt = taskSets.last
+    checkStageId(stageId, attemptIdx, stageAttempt)
+    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map {
+      case (task, idx) =>
+        (Success, makeMapStatus("host" + ('A' + idx).toChar, numShufflePartitions))
+    }.toSeq)
+  }
+
+  /**
+   * Common code to get the next stage attempt, confirm it's the one we expect, and complete it
+   * with all FetchFailure.
+   *
+   * @param stageId - The current stageId
+   * @param attemptIdx - The current attempt count
+   * @param shuffleDep - The shuffle dependency of the stage with a fetch failure
+   */
+  private def completeNextStageWithFetchFailure(
+      stageId: Int,
+      attemptIdx: Int,
+      shuffleDep: ShuffleDependency[_, _, _]): Unit = {
+    val stageAttempt = taskSets.last
+    checkStageId(stageId, attemptIdx, stageAttempt)
+    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleDep.shuffleId, 0, idx, "ignored"), null)
+    }.toSeq)
+  }
+
+  /**
+   * Common code to get the next result stage attempt, confirm it's the one we expect, and
+   * complete it with a success where we return 42.
+   *
+   * @param stageId - The current stageId
+   * @param attemptIdx - The current attempt count
+   */
+  private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = {
+    val stageAttempt = taskSets.last
+    checkStageId(stageId, attemptIdx, stageAttempt)
+    assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
+    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq)
+  }
+
+  /**
+   * In this test, we simulate a job where many tasks in the same stage fail. We want to show
+   * that many fetch failures inside a single stage attempt do not trigger an abort
+   * on their own, but only when there are enough failing stage attempts.
+   */
+  test("Single stage fetch failure should not abort the stage.") {
+    setupStageAbortTest(sc)
+
+    val parts = 8
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep))
+    submit(reduceRdd, (0 until parts).toArray)
+
+    completeShuffleMapStageSuccessfully(0, 0, numShufflePartitions = parts)
+
+    completeNextStageWithFetchFailure(1, 0, shuffleDep)
+
+    // Resubmit and confirm that now all is well
+    scheduler.resubmitFailedStages()
+
+    assert(scheduler.runningStages.nonEmpty)
+    assert(!ended)
+
+    // Complete stage 0 and then stage 1 with a "42"
+    completeShuffleMapStageSuccessfully(0, 1, numShufflePartitions = parts)
+    completeNextResultStageWithSuccess(1, 1)
+
+    // Confirm job finished succesfully
+    sc.listenerBus.waitUntilEmpty(1000)
+    assert(ended === true)
+    assert(results === (0 until parts).map { idx => idx -> 42 }.toMap)
+    assertDataStructuresEmpty()
+  }
+
+  /**
+   * In this test we simulate a job failure where the first stage completes successfully and
+   * the second stage fails due to a fetch failure. Multiple successive fetch failures of a stage
+   * trigger an overall job abort to avoid endless retries.
+   */
+  test("Multiple consecutive stage fetch failures should lead to job being aborted.") {
+    setupStageAbortTest(sc)
+
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduceRdd, Array(0, 1))
+
+    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
+      // Complete all the tasks for the current attempt of stage 0 successfully
+      completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
+
+      // Now we should have a new taskSet, for a new attempt of stage 1.
+      // Fail all these tasks with FetchFailure
+      completeNextStageWithFetchFailure(1, attempt, shuffleDep)
+
+      // this will trigger a resubmission of stage 0, since we've lost some of its
+      // map output, for the next iteration through the loop
+      scheduler.resubmitFailedStages()
+
+      if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
+        assert(scheduler.runningStages.nonEmpty)
+        assert(!ended)
+      } else {
+        // Stage should have been aborted and removed from running stages
+        assertDataStructuresEmpty()
+        sc.listenerBus.waitUntilEmpty(1000)
+        assert(ended)
+        jobResult match {
+          case JobFailed(reason) =>
+            assert(reason.getMessage.contains("ResultStage 1 () has failed the maximum"))
+          case other => fail(s"expected JobFailed, not $other")
+        }
+      }
+    }
+  }
+
+  /**
+   * In this test, we create a job with two consecutive shuffles, and simulate 2 failures for each
+   * shuffle fetch. In total In total, the job has had four failures overall but not four failures
+   * for a particular stage, and as such should not be aborted.
+   */
+  test("Failures in different stages should not trigger an overall abort") {
+    setupStageAbortTest(sc)
+
+    val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
+    val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
+    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
+    val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
+    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+    submit(finalRdd, Array(0))
+
+    // In the first two iterations, Stage 0 succeeds and stage 1 fails. In the next two iterations,
+    // stage 2 fails.
+    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES) {
+      // Complete all the tasks for the current attempt of stage 0 successfully
+      completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
+
+      if (attempt < Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2) {
+        // Now we should have a new taskSet, for a new attempt of stage 1.
+        // Fail all these tasks with FetchFailure
+        completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
+      } else {
+        completeShuffleMapStageSuccessfully(1, attempt, numShufflePartitions = 1)
+
+        // Fail stage 2
+        completeNextStageWithFetchFailure(2, attempt - Stage.MAX_CONSECUTIVE_FETCH_FAILURES / 2,
+          shuffleDepTwo)
+      }
+
+      // this will trigger a resubmission of stage 0, since we've lost some of its
+      // map output, for the next iteration through the loop
+      scheduler.resubmitFailedStages()
+    }
+
+    completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
+    completeShuffleMapStageSuccessfully(1, 4, numShufflePartitions = 1)
+
+    // Succeed stage2 with a "42"
+    completeNextResultStageWithSuccess(2, Stage.MAX_CONSECUTIVE_FETCH_FAILURES/2)
+
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty()
+  }
+
+  /**
+   * In this test we demonstrate that only consecutive failures trigger a stage abort. A stage may
+   * fail multiple times, succeed, then fail a few more times (because its run again by downstream
+   * dependencies). The total number of failed attempts for one stage will go over the limit,
+   * but that doesn't matter, since they have successes in the middle.
+   */
+  test("Non-consecutive stage failures don't trigger abort") {
+    setupStageAbortTest(sc)
+
+    val shuffleOneRdd = new MyRDD(sc, 2, Nil).cache()
+    val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null)
+    val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)).cache()
+    val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null)
+    val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo))
+    submit(finalRdd, Array(0))
+
+    // First, execute stages 0 and 1, failing stage 1 up to MAX-1 times.
+    for (attempt <- 0 until Stage.MAX_CONSECUTIVE_FETCH_FAILURES - 1) {
+      // Make each task in stage 0 success
+      completeShuffleMapStageSuccessfully(0, attempt, numShufflePartitions = 2)
+
+      // Now we should have a new taskSet, for a new attempt of stage 1.
+      // Fail these tasks with FetchFailure
+      completeNextStageWithFetchFailure(1, attempt, shuffleDepOne)
+
+      scheduler.resubmitFailedStages()
+
+      // Confirm we have not yet aborted
+      assert(scheduler.runningStages.nonEmpty)
+      assert(!ended)
+    }
+
+    // Rerun stage 0 and 1 to step through the task set
+    completeShuffleMapStageSuccessfully(0, 3, numShufflePartitions = 2)
+    completeShuffleMapStageSuccessfully(1, 3, numShufflePartitions = 1)
+
+    // Fail stage 2 so that stage 1 is resubmitted when we call scheduler.resubmitFailedStages()
+    completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
+
+    scheduler.resubmitFailedStages()
+
+    // Rerun stage 0 to step through the task set
+    completeShuffleMapStageSuccessfully(0, 4, numShufflePartitions = 2)
+
+    // Now again, fail stage 1 (up to MAX_FAILURES) but confirm that this doesn't trigger an abort
+    // since we succeeded in between.
+    completeNextStageWithFetchFailure(1, 4, shuffleDepOne)
+
+    scheduler.resubmitFailedStages()
+
+    // Confirm we have not yet aborted
+    assert(scheduler.runningStages.nonEmpty)
+    assert(!ended)
+
+    // Next, succeed all and confirm output
+    // Rerun stage 0 + 1
+    completeShuffleMapStageSuccessfully(0, 5, numShufflePartitions = 2)
+    completeShuffleMapStageSuccessfully(1, 5, numShufflePartitions = 1)
+
+    // Succeed stage 2 and verify results
+    completeNextResultStageWithSuccess(2, 1)
+
+    assertDataStructuresEmpty()
+    sc.listenerBus.waitUntilEmpty(1000)
+    assert(ended === true)
+    assert(results === Map(0 -> 42))
+  }
+
   test("trivial shuffle with multiple fetch failures") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
@@ -810,7 +1086,7 @@ class DAGSchedulerSuite
     submit(finalRdd, Array(0))
     cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
     cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
-    // complete stage 2
+    // complete stage 0
     complete(taskSets(0), Seq(
         (Success, makeMapStatus("hostA", 2)),
         (Success, makeMapStatus("hostB", 2))))
@@ -818,7 +1094,7 @@ class DAGSchedulerSuite
     complete(taskSets(1), Seq(
         (Success, makeMapStatus("hostA", 1)),
         (Success, makeMapStatus("hostB", 1))))
-    // pretend stage 0 failed because hostA went down
+    // pretend stage 2 failed because hostA went down
     complete(taskSets(2), Seq(
         (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0, "ignored"), null)))
     // TODO assert this: