diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 2d89232ba2a0aad420ca7f1c98d086aff768a88e..eeb7963c9e610e3e42d6629d56041a7a600984af 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -30,6 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
 @DeveloperApi
 class TaskInfo(
     val taskId: Long,
+    /**
+     * The index of this task within its task set. Not necessarily the same as the ID of the RDD
+     * partition that the task is computing.
+     */
     val index: Int,
     val attemptNumber: Int,
     val launchTime: Long,
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 821e3ee7f13548d714c76a7886a1f6653f2473f5..2ce49ca1345f226a6857ddb06e6be020e59baeeb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -279,6 +279,9 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     }
+    if (!launchedTask) {
+      taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+    }
     return launchedTask
   }
 
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2eedd201ca35509304ef2b2c9bf31ad5c8d712f4..2fef447b0a3c159c31710fb4c0afa10c0eb3dc80 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -83,7 +83,7 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
   val successful = new Array[Boolean](numTasks)
   private val numFailures = new Array[Int](numTasks)
-  // key is taskId, value is a Map of executor id to when it failed
+  // key is taskId (aka TaskInfo.index), value is a Map of executor id to when it failed
   private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
 
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
@@ -270,7 +270,7 @@ private[spark] class TaskSetManager(
    * Is this re-execution of a failed task on an executor it already failed in before
    * EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
    */
-  private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
+  private[scheduler] def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
     if (failedExecutors.contains(taskId)) {
       val failed = failedExecutors.get(taskId).get
 
@@ -575,6 +575,56 @@ private[spark] class TaskSetManager(
     index
   }
 
+  /**
+   * Check whether the given task set has been blacklisted to the point that it can't run anywhere.
+   *
+   * It is possible that this taskset has become impossible to schedule *anywhere* due to the
+   * blacklist.  The most common scenario would be if there are fewer executors than
+   * spark.task.maxFailures. We need to detect this so we can fail the task set, otherwise the job
+   * will hang.
+   *
+   * There's a tradeoff here: we could make sure all tasks in the task set are schedulable, but that
+   * would add extra time to each iteration of the scheduling loop. Here, we take the approach of
+   * making sure at least one of the unscheduled tasks is schedulable. This means we may not detect
+   * the hang as quickly as we could have, but we'll always detect the hang eventually, and the
+   * method is faster in the typical case. In the worst case, this method can take
+   * O(maxTaskFailures + numTasks) time, but it will be faster when there haven't been any task
+   * failures (this is because the method picks on unscheduled task, and then iterates through each
+   * executor until it finds one that the task hasn't failed on already).
+   */
+  private[scheduler] def abortIfCompletelyBlacklisted(executors: Iterable[String]): Unit = {
+
+    val pendingTask: Option[Int] = {
+      // usually this will just take the last pending task, but because of the lazy removal
+      // from each list, we may need to go deeper in the list.  We poll from the end because
+      // failed tasks are put back at the end of allPendingTasks, so we're more likely to find
+      // an unschedulable task this way.
+      val indexOffset = allPendingTasks.lastIndexWhere { indexInTaskSet =>
+        copiesRunning(indexInTaskSet) == 0 && !successful(indexInTaskSet)
+      }
+      if (indexOffset == -1) {
+        None
+      } else {
+        Some(allPendingTasks(indexOffset))
+      }
+    }
+
+    // If no executors have registered yet, don't abort the stage, just wait.  We probably
+    // got here because a task set was added before the executors registered.
+    if (executors.nonEmpty) {
+      // take any task that needs to be scheduled, and see if we can find some executor it *could*
+      // run on
+      pendingTask.foreach { taskId =>
+        if (executors.forall(executorIsBlacklisted(_, taskId))) {
+          val execs = executors.toIndexedSeq.sorted.mkString("(", ",", ")")
+          val partition = tasks(taskId).partitionId
+          abort(s"Aborting ${taskSet} because task $taskId (partition $partition)" +
+            s" has already failed on executors $execs, and no other executors are available.")
+        }
+      }
+    }
+  }
+
   /**
    * Marks the task as getting result and notifies the DAG Scheduler
    */
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 3e69894b102695e7c879c58551b210de8d20afc6..683eeeeb6d6614cdd6d0d9cc74521e2415fcc22d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -53,7 +53,7 @@ class ExecutorSuite extends SparkFunSuite {
     when(mockEnv.closureSerializer).thenReturn(serializer)
     val serializedTask =
       Task.serializeWithDependencies(
-        new FakeTask(0),
+        new FakeTask(0, 0),
         HashMap[String, Long](),
         HashMap[String, Long](),
         serializer.newInstance())
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 8ba2697dd9d6bee01069a656a5958fa7ec1b81b4..14c8b664d4d8b5230d64223a275e198d011e7df9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -57,7 +57,8 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
   testScheduler(
     "With blacklist on, job will still fail if there are too many bad executors on bad host",
     extraConfs = Seq(
-      // just set this to something much longer than the test duration
+      // set this to something much longer than the test duration so that executors don't get
+      // removed from the blacklist during the test
       ("spark.scheduler.executorTaskBlacklistTime", "10000000")
     )
   ) {
@@ -74,7 +75,8 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
   testScheduler(
     "Bad node with multiple executors, job will still succeed with the right confs",
     extraConfs = Seq(
-      // just set this to something much longer than the test duration
+      // set this to something much longer than the test duration so that executors don't get
+      // removed from the blacklist during the test
       ("spark.scheduler.executorTaskBlacklistTime", "10000000"),
       // this has to be higher than the number of executors on the bad host
       ("spark.task.maxFailures", "5"),
@@ -91,6 +93,33 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM
     assertDataStructuresEmpty(noFailure = true)
   }
 
+  // Make sure that if we've failed on all executors, but haven't hit task.maxFailures yet, the job
+  // doesn't hang
+  testScheduler(
+    "SPARK-15865 Progress with fewer executors than maxTaskFailures",
+    extraConfs = Seq(
+      // set this to something much longer than the test duration so that executors don't get
+      // removed from the blacklist during the test
+      "spark.scheduler.executorTaskBlacklistTime" -> "10000000",
+      "spark.testing.nHosts" -> "2",
+      "spark.testing.nExecutorsPerHost" -> "1",
+      "spark.testing.nCoresPerExecutor" -> "1"
+    )
+  ) {
+    def runBackend(): Unit = {
+      val (taskDescription, _) = backend.beginTask()
+      backend.taskFailed(taskDescription, new RuntimeException("test task failure"))
+    }
+    withBackend(runBackend _) {
+      val jobFuture = submit(new MockRDD(sc, 10, Nil), (0 until 10).toArray)
+      Await.ready(jobFuture, duration)
+      val pattern = ("Aborting TaskSet 0.0 because task .* " +
+        "already failed on executors \\(.*\\), and no other executors are available").r
+      assert(pattern.findFirstIn(failure.getMessage).isDefined,
+        s"Couldn't find $pattern in ${failure.getMessage()}")
+    }
+    assertDataStructuresEmpty(noFailure = false)
+  }
 }
 
 class MultiExecutorMockBackend(
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 4fe705b201ec810dc62edefc53e22fa54341f79d..87600fe504b987a313bb5905098edf46dd5d0dcc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -21,7 +21,8 @@ import org.apache.spark.TaskContext
 
 class FakeTask(
     stageId: Int,
-    prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, 0) {
+    partitionId: Int,
+    prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0, partitionId) {
   override def runTask(context: TaskContext): Int = 0
   override def preferredLocations: Seq[TaskLocation] = prefLocs
 }
@@ -40,7 +41,7 @@ object FakeTask {
       throw new IllegalArgumentException("Wrong number of task locations")
     }
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
-      new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+      new FakeTask(0, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
     }
     new TaskSet(tasks, 0, stageAttemptId, 0, null)
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index 467796d7c24b01f6582b83060a2599d7c26631bd..00e1c447ccbef075539725f784825244602afa87 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -30,7 +30,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
   def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
     : TaskSetManager = {
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
-      new FakeTask(i, Nil)
+      new FakeTask(stageId, i, Nil)
     }
     new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 0)
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 634b94f4c3111b2f6dbf85cd28ea913bb163879d..14f52a6be9d1f9a492494346d939dc951c42d173 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -279,12 +279,6 @@ private[spark] abstract class MockBackend(
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
   private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")
 
-  reviveThread.scheduleAtFixedRate(new Runnable {
-    override def run(): Unit = Utils.tryLogNonFatalError {
-      reviveOffers()
-    }
-  }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
-
   /**
    * Test backends should call this to get a task that has been assigned to them by the scheduler.
    * Each task should be responded to with either [[taskSuccess]] or [[taskFailed]].
@@ -348,7 +342,13 @@ private[spark] abstract class MockBackend(
     assignedTasksWaitingToRun.nonEmpty
   }
 
-  override def start(): Unit = {}
+  override def start(): Unit = {
+    reviveThread.scheduleAtFixedRate(new Runnable {
+      override def run(): Unit = Utils.tryLogNonFatalError {
+        reviveOffers()
+      }
+    }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
+  }
 
   override def stop(): Unit = {
     reviveThread.shutdown()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 34b8d55339ccdd538e124cae76ab4056f72daa91..100b15740ca92749f161f534157a58fe009309fd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -281,6 +281,98 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(!failedTaskSet)
   }
 
+  test("abort stage if executor loss results in unschedulability from previously failed tasks") {
+    // Make sure we can detect when a taskset becomes unschedulable from a blacklisting.  This
+    // test explores a particular corner case -- you may have one task fail, but still be
+    // schedulable on another executor.  However, that executor may fail later on, leaving the
+    // first task with no place to run.
+    val taskScheduler = setupScheduler(
+      // set this to something much longer than the test duration so that executors don't get
+      // removed from the blacklist during the test
+      "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+    )
+
+    val taskSet = FakeTask.createTaskSet(2)
+    taskScheduler.submitTasks(taskSet)
+    val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
+
+    val firstTaskAttempts = taskScheduler.resourceOffers(Seq(
+      new WorkerOffer("executor0", "host0", 1),
+      new WorkerOffer("executor1", "host1", 1)
+    )).flatten
+    assert(Set("executor0", "executor1") === firstTaskAttempts.map(_.executorId).toSet)
+
+    // fail one of the tasks, but leave the other running
+    val failedTask = firstTaskAttempts.find(_.executorId == "executor0").get
+    taskScheduler.handleFailedTask(tsm, failedTask.taskId, TaskState.FAILED, TaskResultLost)
+    // at this point, our failed task could run on the other executor, so don't give up the task
+    // set yet.
+    assert(!failedTaskSet)
+
+    // Now we fail our second executor.  The other task can still run on executor1, so make an offer
+    // on that executor, and make sure that the other task (not the failed one) is assigned there
+    taskScheduler.executorLost("executor1", SlaveLost("oops"))
+    val nextTaskAttempts =
+      taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1))).flatten
+    // Note: Its OK if some future change makes this already realize the taskset has become
+    // unschedulable at this point (though in the current implementation, we're sure it will not)
+    assert(nextTaskAttempts.size === 1)
+    assert(nextTaskAttempts.head.executorId === "executor0")
+    assert(nextTaskAttempts.head.attemptNumber === 1)
+    assert(nextTaskAttempts.head.index != failedTask.index)
+
+    // now we should definitely realize that our task set is unschedulable, because the only
+    // task left can't be scheduled on any executors due to the blacklist
+    taskScheduler.resourceOffers(Seq(new WorkerOffer("executor0", "host0", 1)))
+    sc.listenerBus.waitUntilEmpty(100000)
+    assert(tsm.isZombie)
+    assert(failedTaskSet)
+    val idx = failedTask.index
+    assert(failedTaskSetReason == s"Aborting TaskSet 0.0 because task $idx (partition $idx) has " +
+      s"already failed on executors (executor0), and no other executors are available.")
+  }
+
+  test("don't abort if there is an executor available, though it hasn't had scheduled tasks yet") {
+    // interaction of SPARK-15865 & SPARK-16106
+    // if we have a small number of tasks, we might be able to schedule them all on the first
+    // executor.  But if those tasks fail, we should still realize there is another executor
+    // available and not bail on the job
+
+    val taskScheduler = setupScheduler(
+      // set this to something much longer than the test duration so that executors don't get
+      // removed from the blacklist during the test
+      "spark.scheduler.executorTaskBlacklistTime" -> "10000000"
+    )
+
+    val taskSet = FakeTask.createTaskSet(2, (0 until 2).map { _ => Seq(TaskLocation("host0")) }: _*)
+    taskScheduler.submitTasks(taskSet)
+    val tsm = taskScheduler.taskSetManagerForAttempt(taskSet.stageId, taskSet.stageAttemptId).get
+
+    val offers = Seq(
+      // each offer has more than enough free cores for the entire task set, so when combined
+      // with the locality preferences, we schedule all tasks on one executor
+      new WorkerOffer("executor0", "host0", 4),
+      new WorkerOffer("executor1", "host1", 4)
+    )
+    val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+    assert(firstTaskAttempts.size == 2)
+    firstTaskAttempts.foreach { taskAttempt => assert("executor0" === taskAttempt.executorId) }
+
+    // fail all the tasks on the bad executor
+    firstTaskAttempts.foreach { taskAttempt =>
+      taskScheduler.handleFailedTask(tsm, taskAttempt.taskId, TaskState.FAILED, TaskResultLost)
+    }
+
+    // Here is the main check of this test -- we have the same offers again, and we schedule it
+    // successfully.  Because the scheduler first tries to schedule with locality in mind, at first
+    // it won't schedule anything on executor1.  But despite that, we don't abort the job.  Then the
+    // scheduler tries for ANY locality, and successfully schedules tasks on executor1.
+    val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+    assert(secondTaskAttempts.size == 2)
+    secondTaskAttempts.foreach { taskAttempt => assert("executor1" === taskAttempt.executorId) }
+    assert(!failedTaskSet)
+  }
+
   test("SPARK-16106 locality levels updated if executor added to existing host") {
     val taskScheduler = setupScheduler()