diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index fb62682b6c6991a82e5b5a8578b5fcf39cf8b985..aba429bcdca60e8fc0cae1107b9e1c94473c3217 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -114,11 +114,21 @@ package object config {
       .intConf
       .createWithDefault(2)
 
+  private[spark] val MAX_FAILURES_PER_EXEC =
+    ConfigBuilder("spark.blacklist.application.maxFailedTasksPerExecutor")
+      .intConf
+      .createWithDefault(2)
+
   private[spark] val MAX_FAILURES_PER_EXEC_STAGE =
     ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor")
       .intConf
       .createWithDefault(2)
 
+  private[spark] val MAX_FAILED_EXEC_PER_NODE =
+    ConfigBuilder("spark.blacklist.application.maxFailedExecutorsPerNode")
+      .intConf
+      .createWithDefault(2)
+
   private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE =
     ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode")
       .intConf
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index fca4c6d37e446c20d046eeacb59457bef4c8a195..bf7a62ea33875f4c3f23733dec6748342406241c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -17,10 +17,274 @@
 
 package org.apache.spark.scheduler
 
+import java.util.concurrent.atomic.AtomicReference
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
+
+/**
+ * BlacklistTracker is designed to track problematic executors and nodes.  It supports blacklisting
+ * executors and nodes across an entire application (with a periodic expiry).  TaskSetManagers add
+ * additional blacklisting of executors and nodes for individual tasks and stages which works in
+ * concert with the blacklisting here.
+ *
+ * The tracker needs to deal with a variety of workloads, eg.:
+ *
+ *  * bad user code --  this may lead to many task failures, but that should not count against
+ *      individual executors
+ *  * many small stages -- this may prevent a bad executor for having many failures within one
+ *      stage, but still many failures over the entire application
+ *  * "flaky" executors -- they don't fail every task, but are still faulty enough to merit
+ *      blacklisting
+ *
+ * See the design doc on SPARK-8425 for a more in-depth discussion.
+ *
+ * THREADING: As with most helpers of TaskSchedulerImpl, this is not thread-safe.  Though it is
+ * called by multiple threads, callers must already have a lock on the TaskSchedulerImpl.  The
+ * one exception is [[nodeBlacklist()]], which can be called without holding a lock.
+ */
+private[scheduler] class BlacklistTracker (
+    conf: SparkConf,
+    clock: Clock = new SystemClock()) extends Logging {
+
+  BlacklistTracker.validateBlacklistConfs(conf)
+  private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
+  private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
+  val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
+
+  /**
+   * A map from executorId to information on task failures.  Tracks the time of each task failure,
+   * so that we can avoid blacklisting executors due to failures that are very far apart.  We do not
+   * actively remove from this as soon as tasks hit their timeouts, to avoid the time it would take
+   * to do so.  But it will not grow too large, because as soon as an executor gets too many
+   * failures, we blacklist the executor and remove its entry here.
+   */
+  private val executorIdToFailureList = new HashMap[String, ExecutorFailureList]()
+  val executorIdToBlacklistStatus = new HashMap[String, BlacklistedExecutor]()
+  val nodeIdToBlacklistExpiryTime = new HashMap[String, Long]()
+  /**
+   * An immutable copy of the set of nodes that are currently blacklisted.  Kept in an
+   * AtomicReference to make [[nodeBlacklist()]] thread-safe.
+   */
+  private val _nodeBlacklist = new AtomicReference[Set[String]](Set())
+  /**
+   * Time when the next blacklist will expire.  Used as a
+   * shortcut to avoid iterating over all entries in the blacklist when none will have expired.
+   */
+  var nextExpiryTime: Long = Long.MaxValue
+  /**
+   * Mapping from nodes to all of the executors that have been blacklisted on that node. We do *not*
+   * remove from this when executors are removed from spark, so we can track when we get multiple
+   * successive blacklisted executors on one node.  Nonetheless, it will not grow too large because
+   * there cannot be many blacklisted executors on one node, before we stop requesting more
+   * executors on that node, and we clean up the list of blacklisted executors once an executor has
+   * been blacklisted for BLACKLIST_TIMEOUT_MILLIS.
+   */
+  val nodeToBlacklistedExecs = new HashMap[String, HashSet[String]]()
+
+  /**
+   * Un-blacklists executors and nodes that have been blacklisted for at least
+   * BLACKLIST_TIMEOUT_MILLIS
+   */
+  def applyBlacklistTimeout(): Unit = {
+    val now = clock.getTimeMillis()
+    // quickly check if we've got anything to expire from blacklist -- if not, avoid doing any work
+    if (now > nextExpiryTime) {
+      // Apply the timeout to blacklisted nodes and executors
+      val execsToUnblacklist = executorIdToBlacklistStatus.filter(_._2.expiryTime < now).keys
+      if (execsToUnblacklist.nonEmpty) {
+        // Un-blacklist any executors that have been blacklisted longer than the blacklist timeout.
+        logInfo(s"Removing executors $execsToUnblacklist from blacklist because the blacklist " +
+          s"for those executors has timed out")
+        execsToUnblacklist.foreach { exec =>
+          val status = executorIdToBlacklistStatus.remove(exec).get
+          val failedExecsOnNode = nodeToBlacklistedExecs(status.node)
+          failedExecsOnNode.remove(exec)
+          if (failedExecsOnNode.isEmpty) {
+            nodeToBlacklistedExecs.remove(status.node)
+          }
+        }
+      }
+      val nodesToUnblacklist = nodeIdToBlacklistExpiryTime.filter(_._2 < now).keys
+      if (nodesToUnblacklist.nonEmpty) {
+        // Un-blacklist any nodes that have been blacklisted longer than the blacklist timeout.
+        logInfo(s"Removing nodes $nodesToUnblacklist from blacklist because the blacklist " +
+          s"has timed out")
+        nodeIdToBlacklistExpiryTime --= nodesToUnblacklist
+        _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+      }
+      updateNextExpiryTime()
+    }
+  }
+
+  private def updateNextExpiryTime(): Unit = {
+    val execMinExpiry = if (executorIdToBlacklistStatus.nonEmpty) {
+      executorIdToBlacklistStatus.map{_._2.expiryTime}.min
+    } else {
+      Long.MaxValue
+    }
+    val nodeMinExpiry = if (nodeIdToBlacklistExpiryTime.nonEmpty) {
+      nodeIdToBlacklistExpiryTime.values.min
+    } else {
+      Long.MaxValue
+    }
+    nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
+  }
+
+
+  def updateBlacklistForSuccessfulTaskSet(
+      stageId: Int,
+      stageAttemptId: Int,
+      failuresByExec: HashMap[String, ExecutorFailuresInTaskSet]): Unit = {
+    // if any tasks failed, we count them towards the overall failure count for the executor at
+    // this point.
+    val now = clock.getTimeMillis()
+    failuresByExec.foreach { case (exec, failuresInTaskSet) =>
+      val appFailuresOnExecutor =
+        executorIdToFailureList.getOrElseUpdate(exec, new ExecutorFailureList)
+      appFailuresOnExecutor.addFailures(stageId, stageAttemptId, failuresInTaskSet)
+      appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now)
+      val newTotal = appFailuresOnExecutor.numUniqueTaskFailures
+
+      val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
+      // If this pushes the total number of failures over the threshold, blacklist the executor.
+      // If its already blacklisted, we avoid "re-blacklisting" (which can happen if there were
+      // other tasks already running in another taskset when it got blacklisted), because it makes
+      // some of the logic around expiry times a little more confusing.  But it also wouldn't be a
+      // problem to re-blacklist, with a later expiry time.
+      if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
+        logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
+          s" task failures in successful task sets")
+        val node = failuresInTaskSet.node
+        executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
+        updateNextExpiryTime()
+
+        // In addition to blacklisting the executor, we also update the data for failures on the
+        // node, and potentially put the entire node into a blacklist as well.
+        val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
+        blacklistedExecsOnNode += exec
+        // If the node is already in the blacklist, we avoid adding it again with a later expiry
+        // time.
+        if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
+            !nodeIdToBlacklistExpiryTime.contains(node)) {
+          logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
+            s"executors blacklisted: ${blacklistedExecsOnNode}")
+          nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
+          _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+        }
+      }
+    }
+  }
+
+  def isExecutorBlacklisted(executorId: String): Boolean = {
+    executorIdToBlacklistStatus.contains(executorId)
+  }
+
+  /**
+   * Get the full set of nodes that are blacklisted.  Unlike other methods in this class, this *IS*
+   * thread-safe -- no lock required on a taskScheduler.
+   */
+  def nodeBlacklist(): Set[String] = {
+    _nodeBlacklist.get()
+  }
+
+  def isNodeBlacklisted(node: String): Boolean = {
+    nodeIdToBlacklistExpiryTime.contains(node)
+  }
+
+  def handleRemovedExecutor(executorId: String): Unit = {
+    // We intentionally do not clean up executors that are already blacklisted in
+    // nodeToBlacklistedExecs, so that if another executor on the same node gets blacklisted, we can
+    // blacklist the entire node.  We also can't clean up executorIdToBlacklistStatus, so we can
+    // eventually remove the executor after the timeout.  Despite not clearing those structures
+    // here, we don't expect they will grow too big since you won't get too many executors on one
+    // node, and the timeout will clear it up periodically in any case.
+    executorIdToFailureList -= executorId
+  }
+
+
+  /**
+   * Tracks all failures for one executor (that have not passed the timeout).
+   *
+   * In general we actually expect this to be extremely small, since it won't contain more than the
+   * maximum number of task failures before an executor is failed (default 2).
+   */
+  private[scheduler] final class ExecutorFailureList extends Logging {
+
+    private case class TaskId(stage: Int, stageAttempt: Int, taskIndex: Int)
+
+    /**
+     * All failures on this executor in successful task sets.
+     */
+    private var failuresAndExpiryTimes = ArrayBuffer[(TaskId, Long)]()
+    /**
+     * As an optimization, we track the min expiry time over all entries in failuresAndExpiryTimes
+     * so its quick to tell if there are any failures with expiry before the current time.
+     */
+    private var minExpiryTime = Long.MaxValue
+
+    def addFailures(
+        stage: Int,
+        stageAttempt: Int,
+        failuresInTaskSet: ExecutorFailuresInTaskSet): Unit = {
+      failuresInTaskSet.taskToFailureCountAndFailureTime.foreach {
+        case (taskIdx, (_, failureTime)) =>
+          val expiryTime = failureTime + BLACKLIST_TIMEOUT_MILLIS
+          failuresAndExpiryTimes += ((TaskId(stage, stageAttempt, taskIdx), expiryTime))
+          if (expiryTime < minExpiryTime) {
+            minExpiryTime = expiryTime
+          }
+      }
+    }
+
+    /**
+     * The number of unique tasks that failed on this executor.  Only counts failures within the
+     * timeout, and in successful tasksets.
+     */
+    def numUniqueTaskFailures: Int = failuresAndExpiryTimes.size
+
+    def isEmpty: Boolean = failuresAndExpiryTimes.isEmpty
+
+    /**
+     * Apply the timeout to individual tasks.  This is to prevent one-off failures that are very
+     * spread out in time (and likely have nothing to do with problems on the executor) from
+     * triggering blacklisting.  However, note that we do *not* remove executors and nodes from
+     * the blacklist as we expire individual task failures -- each have their own timeout.  Eg.,
+     * suppose:
+     *  * timeout = 10, maxFailuresPerExec = 2
+     *  * Task 1 fails on exec 1 at time 0
+     *  * Task 2 fails on exec 1 at time 5
+     * -->  exec 1 is blacklisted from time 5 - 15.
+     * This is to simplify the implementation, as well as keep the behavior easier to understand
+     * for the end user.
+     */
+    def dropFailuresWithTimeoutBefore(dropBefore: Long): Unit = {
+      if (minExpiryTime < dropBefore) {
+        var newMinExpiry = Long.MaxValue
+        val newFailures = new ArrayBuffer[(TaskId, Long)]
+        failuresAndExpiryTimes.foreach { case (task, expiryTime) =>
+          if (expiryTime >= dropBefore) {
+            newFailures += ((task, expiryTime))
+            if (expiryTime < newMinExpiry) {
+              newMinExpiry = expiryTime
+            }
+          }
+        }
+        failuresAndExpiryTimes = newFailures
+        minExpiryTime = newMinExpiry
+      }
+    }
+
+    override def toString(): String = {
+      s"failures = $failuresAndExpiryTimes"
+    }
+  }
+
+}
 
 private[scheduler] object BlacklistTracker extends Logging {
 
@@ -80,7 +344,9 @@ private[scheduler] object BlacklistTracker extends Logging {
       config.MAX_TASK_ATTEMPTS_PER_EXECUTOR,
       config.MAX_TASK_ATTEMPTS_PER_NODE,
       config.MAX_FAILURES_PER_EXEC_STAGE,
-      config.MAX_FAILED_EXEC_PER_NODE_STAGE
+      config.MAX_FAILED_EXEC_PER_NODE_STAGE,
+      config.MAX_FAILURES_PER_EXEC,
+      config.MAX_FAILED_EXEC_PER_NODE
     ).foreach { config =>
       val v = conf.get(config)
       if (v <= 0) {
@@ -112,3 +378,5 @@ private[scheduler] object BlacklistTracker extends Logging {
     }
   }
 }
+
+private final case class BlacklistedExecutor(node: String, expiryTime: Long)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
index 20ab27d127abae35d97bcb9e2a1c10cb58fabd93..70553d8be28b583ebafdbd200d9d89ac1fac61de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorFailuresInTaskSet.scala
@@ -25,26 +25,30 @@ import scala.collection.mutable.HashMap
 private[scheduler] class ExecutorFailuresInTaskSet(val node: String) {
   /**
    * Mapping from index of the tasks in the taskset, to the number of times it has failed on this
-   * executor.
+   * executor and the most recent failure time.
    */
-  val taskToFailureCount = HashMap[Int, Int]()
+  val taskToFailureCountAndFailureTime = HashMap[Int, (Int, Long)]()
 
-  def updateWithFailure(taskIndex: Int): Unit = {
-    val prevFailureCount = taskToFailureCount.getOrElse(taskIndex, 0)
-    taskToFailureCount(taskIndex) = prevFailureCount + 1
+  def updateWithFailure(taskIndex: Int, failureTime: Long): Unit = {
+    val (prevFailureCount, prevFailureTime) =
+      taskToFailureCountAndFailureTime.getOrElse(taskIndex, (0, -1L))
+    // these times always come from the driver, so we don't need to worry about skew, but might
+    // as well still be defensive in case there is non-monotonicity in the clock
+    val newFailureTime = math.max(prevFailureTime, failureTime)
+    taskToFailureCountAndFailureTime(taskIndex) = (prevFailureCount + 1, newFailureTime)
   }
 
-  def numUniqueTasksWithFailures: Int = taskToFailureCount.size
+  def numUniqueTasksWithFailures: Int = taskToFailureCountAndFailureTime.size
 
   /**
    * Return the number of times this executor has failed on the given task index.
    */
   def getNumTaskFailures(index: Int): Int = {
-    taskToFailureCount.getOrElse(index, 0)
+    taskToFailureCountAndFailureTime.getOrElse(index, (0, 0))._1
   }
 
   override def toString(): String = {
     s"numUniqueTasksWithFailures = $numUniqueTasksWithFailures; " +
-      s"tasksToFailureCount = $taskToFailureCount"
+      s"tasksToFailureCount = $taskToFailureCountAndFailureTime"
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index b03cfe4f0dc49b59b7a12d011960e6deb44be1dc..9a8e313f9ec7b854b828e286d3786b9a21f70278 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -51,13 +51,28 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
  * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
  * we are holding a lock on ourselves.
  */
-private[spark] class TaskSchedulerImpl(
+private[spark] class TaskSchedulerImpl private[scheduler](
     val sc: SparkContext,
     val maxTaskFailures: Int,
+    blacklistTrackerOpt: Option[BlacklistTracker],
     isLocal: Boolean = false)
   extends TaskScheduler with Logging
 {
-  def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
+
+  def this(sc: SparkContext) = {
+    this(
+      sc,
+      sc.conf.get(config.MAX_TASK_FAILURES),
+      TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf))
+  }
+
+  def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
+    this(
+      sc,
+      maxTaskFailures,
+      TaskSchedulerImpl.maybeCreateBlacklistTracker(sc.conf),
+      isLocal = isLocal)
+  }
 
   val conf = sc.conf
 
@@ -209,7 +224,7 @@ private[spark] class TaskSchedulerImpl(
   private[scheduler] def createTaskSetManager(
       taskSet: TaskSet,
       maxTaskFailures: Int): TaskSetManager = {
-    new TaskSetManager(this, taskSet, maxTaskFailures)
+    new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
   }
 
   override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = synchronized {
@@ -256,6 +271,8 @@ private[spark] class TaskSchedulerImpl(
       availableCpus: Array[Int],
       tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
     var launchedTask = false
+    // nodes and executors that are blacklisted for the entire application have already been
+    // filtered out by this point
     for (i <- 0 until shuffledOffers.size) {
       val execId = shuffledOffers(i).executorId
       val host = shuffledOffers(i).host
@@ -308,8 +325,20 @@ private[spark] class TaskSchedulerImpl(
       }
     }
 
+    // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
+    // this here to avoid a separate thread and added synchronization overhead, and also because
+    // updating the blacklist is only relevant when task offers are being made.
+    blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
+
+    val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
+      offers.filter { offer =>
+        !blacklistTracker.isNodeBlacklisted(offer.host) &&
+          !blacklistTracker.isExecutorBlacklisted(offer.executorId)
+      }
+    }.getOrElse(offers)
+
     // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
-    val shuffledOffers = Random.shuffle(offers)
+    val shuffledOffers = Random.shuffle(filteredOffers)
     // Build a list of tasks to assign to each worker.
     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
     val availableCpus = shuffledOffers.map(o => o.cores).toArray
@@ -574,6 +603,7 @@ private[spark] class TaskSchedulerImpl(
       executorIdToHost -= executorId
       rootPool.executorLost(executorId, host, reason)
     }
+    blacklistTrackerOpt.foreach(_.handleRemovedExecutor(executorId))
   }
 
   def executorAdded(execId: String, host: String) {
@@ -600,6 +630,14 @@ private[spark] class TaskSchedulerImpl(
     executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
   }
 
+  /**
+   * Get a snapshot of the currently blacklisted nodes for the entire application.  This is
+   * thread-safe -- it can be called without a lock on the TaskScheduler.
+   */
+  def nodeBlacklist(): scala.collection.immutable.Set[String] = {
+    blacklistTrackerOpt.map(_.nodeBlacklist()).getOrElse(scala.collection.immutable.Set())
+  }
+
   // By default, rack is unknown
   def getRackForHost(value: String): Option[String] = None
 
@@ -678,4 +716,13 @@ private[spark] object TaskSchedulerImpl {
 
     retval.toList
   }
+
+  private def maybeCreateBlacklistTracker(conf: SparkConf): Option[BlacklistTracker] = {
+    if (BlacklistTracker.isBlacklistEnabled(conf)) {
+      Some(new BlacklistTracker(conf))
+    } else {
+      None
+    }
+  }
+
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
index f4b0f55b7686aa7fd6ce1d2fa4588fab217f9756..e815b7e0cf6c9fed95d0c6b54a0a90e30e1d1bd3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala
@@ -28,6 +28,10 @@ import org.apache.spark.util.Clock
  * (task, executor) / (task, nodes) pairs, and also completely blacklisting executors and nodes
  * for the entire taskset.
  *
+ * It also must store sufficient information in task failures for application level blacklisting,
+ * which is handled by [[BlacklistTracker]].  Note that BlacklistTracker does not know anything
+ * about task failures until a taskset completes successfully.
+ *
  * THREADING:  This class is a helper to [[TaskSetManager]]; as with the methods in
  * [[TaskSetManager]] this class is designed only to be called from code with a lock on the
  * TaskScheduler (e.g. its event handlers). It should not be called from other threads.
@@ -41,7 +45,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
   private val MAX_FAILED_EXEC_PER_NODE_STAGE = conf.get(config.MAX_FAILED_EXEC_PER_NODE_STAGE)
 
   /**
-   * A map from each executor to the task failures on that executor.
+   * A map from each executor to the task failures on that executor.  This is used for blacklisting
+   * within this taskset, and it is also relayed onto [[BlacklistTracker]] for app-level
+   * blacklisting if this taskset completes successfully.
    */
   val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()
 
@@ -57,9 +63,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
 
   /**
    * Return true if this executor is blacklisted for the given task.  This does *not*
-   * need to return true if the executor is blacklisted for the entire stage.
-   * That is to keep this method as fast as possible in the inner-loop of the
-   * scheduler, where those filters will have already been applied.
+   * need to return true if the executor is blacklisted for the entire stage, or blacklisted
+   * for the entire application.  That is to keep this method as fast as possible in the inner-loop
+   * of the scheduler, where those filters will have already been applied.
    */
   def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {
     execToFailures.get(executorId).exists { execFailures =>
@@ -72,10 +78,10 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
   }
 
   /**
-   * Return true if this executor is blacklisted for the given stage.  Completely ignores
-   * anything to do with the node the executor is on.  That
-   * is to keep this method as fast as possible in the inner-loop of the scheduler, where those
-   * filters will already have been applied.
+   * Return true if this executor is blacklisted for the given stage.  Completely ignores whether
+   * the executor is blacklisted for the entire application (or anything to do with the node the
+   * executor is on).  That is to keep this method as fast as possible in the inner-loop of the
+   * scheduler, where those filters will already have been applied.
    */
   def isExecutorBlacklistedForTaskSet(executorId: String): Boolean = {
     blacklistedExecs.contains(executorId)
@@ -90,7 +96,7 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int,
       exec: String,
       index: Int): Unit = {
     val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
-    execFailures.updateWithFailure(index)
+    execFailures.updateWithFailure(index, clock.getTimeMillis())
 
     // check if this task has also failed on other executors on the same host -- if its gone
     // over the limit, blacklist this task from the entire host.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index f2a432cad33a8b2f86ef12f0578fae8ace4cc32e..3756c216f5ecb696f60c2733bdf5d5d304d6c556 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -51,6 +51,7 @@ private[spark] class TaskSetManager(
     sched: TaskSchedulerImpl,
     val taskSet: TaskSet,
     val maxTaskFailures: Int,
+    blacklistTracker: Option[BlacklistTracker] = None,
     clock: Clock = new SystemClock()) extends Schedulable with Logging {
 
   private val conf = sched.sc.conf
@@ -85,10 +86,8 @@ private[spark] class TaskSetManager(
   var calculatedTasks = 0
 
   private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
-    if (BlacklistTracker.isBlacklistEnabled(conf)) {
-      Some(new TaskSetBlacklist(conf, stageId, clock))
-    } else {
-      None
+    blacklistTracker.map { _ =>
+      new TaskSetBlacklist(conf, stageId, clock)
     }
   }
 
@@ -487,6 +486,12 @@ private[spark] class TaskSetManager(
   private def maybeFinishTaskSet() {
     if (isZombie && runningTasks == 0) {
       sched.taskSetFinished(this)
+      if (tasksSuccessful == numTasks) {
+        blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet(
+          taskSet.stageId,
+          taskSet.stageAttemptId,
+          taskSetBlacklistHelperOpt.get.execToFailures))
+      }
     }
   }
 
@@ -589,6 +594,7 @@ private[spark] class TaskSetManager(
   private[scheduler] def abortIfCompletelyBlacklisted(
       hostToExecutors: HashMap[String, HashSet[String]]): Unit = {
     taskSetBlacklistHelperOpt.foreach { taskSetBlacklist =>
+      val appBlacklist = blacklistTracker.get
       // Only look for unschedulable tasks when at least one executor has registered. Otherwise,
       // task sets will be (unnecessarily) aborted in cases when no executors have registered yet.
       if (hostToExecutors.nonEmpty) {
@@ -615,13 +621,15 @@ private[spark] class TaskSetManager(
           val blacklistedEverywhere = hostToExecutors.forall { case (host, execsOnHost) =>
             // Check if the task can run on the node
             val nodeBlacklisted =
-              taskSetBlacklist.isNodeBlacklistedForTaskSet(host) ||
-              taskSetBlacklist.isNodeBlacklistedForTask(host, indexInTaskSet)
+              appBlacklist.isNodeBlacklisted(host) ||
+                taskSetBlacklist.isNodeBlacklistedForTaskSet(host) ||
+                taskSetBlacklist.isNodeBlacklistedForTask(host, indexInTaskSet)
             if (nodeBlacklisted) {
               true
             } else {
               // Check if the task can run on any of the executors
               execsOnHost.forall { exec =>
+                appBlacklist.isExecutorBlacklisted(exec) ||
                   taskSetBlacklist.isExecutorBlacklistedForTaskSet(exec) ||
                   taskSetBlacklist.isExecutorBlacklistedForTask(exec, indexInTaskSet)
               }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 0a4f19d76073e158b4897db6b1bccd533eed4d92..02803598097d9d9faa9085ed1e1baa5600616745 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -99,7 +99,8 @@ private[spark] object CoarseGrainedClusterMessages {
   case class RequestExecutors(
       requestedTotal: Int,
       localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int])
+      hostToLocalTaskCount: Map[String, Int],
+      nodeBlacklist: Set[String])
     extends CoarseGrainedClusterMessage
 
   // Check if an executor was force-killed but for a reason unrelated to the running tasks.
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index 915d7a1b8b164bed240f6f57fda6ee12228601de..7b6a2313f9e2a47eb24c05c8ac7389ddaf49fc58 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -272,7 +272,7 @@ private class FakeSchedulerBackend(
 
   protected override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
     clusterManagerEndpoint.ask[Boolean](
-      RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
+      RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount, Set.empty[String]))
   }
 
   protected override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
@@ -291,7 +291,7 @@ private class FakeClusterManager(override val rpcEnv: RpcEnv) extends RpcEndpoin
   def getExecutorIdsToKill: Set[String] = executorIdsToKill.toSet
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-    case RequestExecutors(requestedTotal, _, _) =>
+    case RequestExecutors(requestedTotal, _, _, _) =>
       targetNumExecutors = requestedTotal
       context.reply(true)
     case KillExecutors(executorIds) =>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index b2e7ec5df015cdc8bb232cddedd5a0d1a732044c..6b314d2ae339f092c3cb2193e26b65d1a0f7ab52 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -17,10 +17,356 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark._
 import org.apache.spark.internal.config
+import org.apache.spark.util.ManualClock
+
+class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with MockitoSugar
+    with LocalSparkContext {
+
+  private val clock = new ManualClock(0)
+
+  private var blacklist: BlacklistTracker = _
+  private var scheduler: TaskSchedulerImpl = _
+  private var conf: SparkConf = _
+
+  override def beforeEach(): Unit = {
+    conf = new SparkConf().setAppName("test").setMaster("local")
+      .set(config.BLACKLIST_ENABLED.key, "true")
+    scheduler = mockTaskSchedWithConf(conf)
+
+    clock.setTime(0)
+    blacklist = new BlacklistTracker(conf, clock)
+  }
+
+  override def afterEach(): Unit = {
+    if (blacklist != null) {
+      blacklist = null
+    }
+    if (scheduler != null) {
+      scheduler.stop()
+      scheduler = null
+    }
+    super.afterEach()
+  }
+
+  // All executors and hosts used in tests should be in this set, so that [[assertEquivalentToSet]]
+  // works.  Its OK if its got extraneous entries
+  val allExecutorAndHostIds = {
+    (('A' to 'Z')++ (1 to 100).map(_.toString))
+      .flatMap{ suffix =>
+        Seq(s"host$suffix", s"host-$suffix")
+      }
+  }.toSet
+
+  /**
+   * Its easier to write our tests as if we could directly look at the sets of nodes & executors in
+   * the blacklist.  However the api doesn't expose a set, so this is a simple way to test
+   * something similar, since we know the universe of values that might appear in these sets.
+   */
+  def assertEquivalentToSet(f: String => Boolean, expected: Set[String]): Unit = {
+    allExecutorAndHostIds.foreach { id =>
+      val actual = f(id)
+      val exp = expected.contains(id)
+      assert(actual === exp, raw"""for string "$id" """)
+    }
+  }
 
-class BlacklistTrackerSuite extends SparkFunSuite {
+  def mockTaskSchedWithConf(conf: SparkConf): TaskSchedulerImpl = {
+    sc = new SparkContext(conf)
+    val scheduler = mock[TaskSchedulerImpl]
+    when(scheduler.sc).thenReturn(sc)
+    when(scheduler.mapOutputTracker).thenReturn(SparkEnv.get.mapOutputTracker)
+    scheduler
+  }
+
+  def createTaskSetBlacklist(stageId: Int = 0): TaskSetBlacklist = {
+    new TaskSetBlacklist(conf, stageId, clock)
+  }
+
+  test("executors can be blacklisted with only a few failures per stage") {
+    // For many different stages, executor 1 fails a task, then executor 2 succeeds the task,
+    // and then the task set is done.  Not enough failures to blacklist the executor *within*
+    // any particular taskset, but we still blacklist the executor overall eventually.
+    // Also, we intentionally have a mix of task successes and failures -- there are even some
+    // successes after the executor is blacklisted.  The idea here is those tasks get scheduled
+    // before the executor is blacklisted.  We might get successes after blacklisting (because the
+    // executor might be flaky but not totally broken).  But successes should not unblacklist the
+    // executor.
+    val failuresUntilBlacklisted = conf.get(config.MAX_FAILURES_PER_EXEC)
+    var failuresSoFar = 0
+    (0 until failuresUntilBlacklisted * 10).foreach { stageId =>
+      val taskSetBlacklist = createTaskSetBlacklist(stageId)
+      if (stageId % 2 == 0) {
+        // fail one task in every other taskset
+        taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+        failuresSoFar += 1
+      }
+      blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
+      assert(failuresSoFar == stageId / 2 + 1)
+      if (failuresSoFar < failuresUntilBlacklisted) {
+        assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+      } else {
+        assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+      }
+    }
+  }
+
+  // If an executor has many task failures, but the task set ends up failing, it shouldn't be
+  // counted against the executor.
+  test("executors aren't blacklisted as a result of tasks in failed task sets") {
+    val failuresUntilBlacklisted = conf.get(config.MAX_FAILURES_PER_EXEC)
+    // for many different stages, executor 1 fails a task, and then the taskSet fails.
+    (0 until failuresUntilBlacklisted * 10).foreach { stage =>
+      val taskSetBlacklist = createTaskSetBlacklist(stage)
+      taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+    }
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+  }
+
+  Seq(true, false).foreach { succeedTaskSet =>
+    val label = if (succeedTaskSet) "success" else "failure"
+    test(s"stage blacklist updates correctly on stage $label") {
+      // Within one taskset, an executor fails a few times, so it's blacklisted for the taskset.
+      // But if the taskset fails, we shouldn't blacklist the executor after the stage.
+      val taskSetBlacklist = createTaskSetBlacklist(0)
+      // We trigger enough failures for both the taskset blacklist, and the application blacklist.
+      val numFailures = math.max(conf.get(config.MAX_FAILURES_PER_EXEC),
+        conf.get(config.MAX_FAILURES_PER_EXEC_STAGE))
+      (0 until numFailures).foreach { index =>
+        taskSetBlacklist.updateBlacklistForFailedTask("hostA", exec = "1", index = index)
+      }
+      assert(taskSetBlacklist.isExecutorBlacklistedForTaskSet("1"))
+      assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+      if (succeedTaskSet) {
+        // The task set succeeded elsewhere, so we should count those failures against our executor,
+        // and it should be blacklisted for the entire application.
+        blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist.execToFailures)
+        assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+      } else {
+        // The task set failed, so we don't count these failures against the executor for other
+        // stages.
+        assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+      }
+    }
+  }
+
+  test("blacklisted executors and nodes get recovered with time") {
+    val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+    // Fail 4 tasks in one task set on executor 1, so that executor gets blacklisted for the whole
+    // application.
+    (0 until 4).foreach { partition =>
+      taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+    }
+    blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
+    assert(blacklist.nodeBlacklist() === Set())
+    assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+
+    val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+    // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole
+    // application.  Since that's the second executor that is blacklisted on the same node, we also
+    // blacklist that node.
+    (0 until 4).foreach { partition =>
+      taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+    }
+    blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
+    assert(blacklist.nodeBlacklist() === Set("hostA"))
+    assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set("hostA"))
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
+
+    // Advance the clock and then make sure hostA and executors 1 and 2 have been removed from the
+    // blacklist.
+    clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+    blacklist.applyBlacklistTimeout()
+    assert(blacklist.nodeBlacklist() === Set())
+    assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+
+    // Fail one more task, but executor isn't put back into blacklist since the count of failures
+    // on that executor should have been reset to 0.
+    val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
+    taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+    blacklist.updateBlacklistForSuccessfulTaskSet(2, 0, taskSetBlacklist2.execToFailures)
+    assert(blacklist.nodeBlacklist() === Set())
+    assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+  }
+
+  test("blacklist can handle lost executors") {
+    // The blacklist should still work if an executor is killed completely.  We should still
+    // be able to blacklist the entire node.
+    val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+    // Lets say that executor 1 dies completely.  We get some task failures, but
+    // the taskset then finishes successfully (elsewhere).
+    (0 until 4).foreach { partition =>
+      taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = partition)
+    }
+    blacklist.handleRemovedExecutor("1")
+    blacklist.updateBlacklistForSuccessfulTaskSet(
+      stageId = 0,
+      stageAttemptId = 0,
+      taskSetBlacklist0.execToFailures)
+    assert(blacklist.isExecutorBlacklisted("1"))
+    clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS / 2)
+
+    // Now another executor gets spun up on that host, but it also dies.
+    val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+    (0 until 4).foreach { partition =>
+      taskSetBlacklist1.updateBlacklistForFailedTask("hostA", exec = "2", index = partition)
+    }
+    blacklist.handleRemovedExecutor("2")
+    blacklist.updateBlacklistForSuccessfulTaskSet(
+      stageId = 1,
+      stageAttemptId = 0,
+      taskSetBlacklist1.execToFailures)
+    // We've now had two bad executors on the hostA, so we should blacklist the entire node.
+    assert(blacklist.isExecutorBlacklisted("1"))
+    assert(blacklist.isExecutorBlacklisted("2"))
+    assert(blacklist.isNodeBlacklisted("hostA"))
+
+    // Advance the clock so that executor 1 should no longer be explicitly blacklisted, but
+    // everything else should still be blacklisted.
+    clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS / 2 + 1)
+    blacklist.applyBlacklistTimeout()
+    assert(!blacklist.isExecutorBlacklisted("1"))
+    assert(blacklist.isExecutorBlacklisted("2"))
+    assert(blacklist.isNodeBlacklisted("hostA"))
+    // make sure we don't leak memory
+    assert(!blacklist.executorIdToBlacklistStatus.contains("1"))
+    assert(!blacklist.nodeToBlacklistedExecs("hostA").contains("1"))
+    // Advance the timeout again so now hostA should be removed from the blacklist.
+    clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS / 2)
+    blacklist.applyBlacklistTimeout()
+    assert(!blacklist.nodeIdToBlacklistExpiryTime.contains("hostA"))
+  }
+
+  test("task failures expire with time") {
+    // Verifies that 2 failures within the timeout period cause an executor to be blacklisted, but
+    // if task failures are spaced out by more than the timeout period, the first failure is timed
+    // out, and the executor isn't blacklisted.
+    var stageId = 0
+    def failOneTaskInTaskSet(exec: String): Unit = {
+      val taskSetBlacklist = createTaskSetBlacklist(stageId = stageId)
+      taskSetBlacklist.updateBlacklistForFailedTask("host-" + exec, exec, 0)
+      blacklist.updateBlacklistForSuccessfulTaskSet(stageId, 0, taskSetBlacklist.execToFailures)
+      stageId += 1
+    }
+    failOneTaskInTaskSet(exec = "1")
+    // We have one sporadic failure on exec 2, but that's it.  Later checks ensure that we never
+    // blacklist executor 2 despite this one failure.
+    failOneTaskInTaskSet(exec = "2")
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+    assert(blacklist.nextExpiryTime === Long.MaxValue)
+
+    // We advance the clock past the expiry time.
+    clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+    val t0 = clock.getTimeMillis()
+    blacklist.applyBlacklistTimeout()
+    assert(blacklist.nextExpiryTime === Long.MaxValue)
+    failOneTaskInTaskSet(exec = "1")
+
+    // Because the 2nd failure on executor 1 happened past the expiry time, nothing should have been
+    // blacklisted.
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+
+    // Now we add one more failure, within the timeout, and it should be counted.
+    clock.setTime(t0 + blacklist.BLACKLIST_TIMEOUT_MILLIS - 1)
+    val t1 = clock.getTimeMillis()
+    failOneTaskInTaskSet(exec = "1")
+    blacklist.applyBlacklistTimeout()
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+    assert(blacklist.nextExpiryTime === t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+    // Add failures on executor 3, make sure it gets put on the blacklist.
+    clock.setTime(t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS - 1)
+    val t2 = clock.getTimeMillis()
+    failOneTaskInTaskSet(exec = "3")
+    failOneTaskInTaskSet(exec = "3")
+    blacklist.applyBlacklistTimeout()
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "3"))
+    assert(blacklist.nextExpiryTime === t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+    // Now we go past the timeout for executor 1, so it should be dropped from the blacklist.
+    clock.setTime(t1 + blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+    blacklist.applyBlacklistTimeout()
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("3"))
+    assert(blacklist.nextExpiryTime === t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+    // Make sure that we update correctly when we go from having blacklisted executors to
+    // just having tasks with timeouts.
+    clock.setTime(t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS - 1)
+    failOneTaskInTaskSet(exec = "4")
+    blacklist.applyBlacklistTimeout()
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("3"))
+    assert(blacklist.nextExpiryTime === t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+
+    clock.setTime(t2 + blacklist.BLACKLIST_TIMEOUT_MILLIS + 1)
+    blacklist.applyBlacklistTimeout()
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+    // we've got one task failure still, but we don't bother setting nextExpiryTime to it, to
+    // avoid wasting time checking for expiry of individual task failures.
+    assert(blacklist.nextExpiryTime === Long.MaxValue)
+  }
+
+  test("task failure timeout works as expected for long-running tasksets") {
+    // This ensures that we don't trigger spurious blacklisting for long tasksets, when the taskset
+    // finishes long after the task failures.  We create two tasksets, each with one failure.
+    // Individually they shouldn't cause any blacklisting since there is only one failure.
+    // Furthermore, we space the failures out so far that even when both tasksets have completed,
+    // we still don't trigger any blacklisting.
+    val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+    val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 2)
+    // Taskset1 has one failure immediately
+    taskSetBlacklist1.updateBlacklistForFailedTask("host-1", "1", 0)
+    // Then we have a *long* delay, much longer than the timeout, before any other failures or
+    // taskset completion
+    clock.advance(blacklist.BLACKLIST_TIMEOUT_MILLIS * 5)
+    // After the long delay, we have one failure on taskset 2, on the same executor
+    taskSetBlacklist2.updateBlacklistForFailedTask("host-1", "1", 0)
+    // Finally, we complete both tasksets.  Its important here to complete taskset2 *first*.  We
+    // want to make sure that when taskset 1 finishes, even though we've now got two task failures,
+    // we realize that the task failure we just added was well before the timeout.
+    clock.advance(1)
+    blacklist.updateBlacklistForSuccessfulTaskSet(stageId = 2, 0, taskSetBlacklist2.execToFailures)
+    clock.advance(1)
+    blacklist.updateBlacklistForSuccessfulTaskSet(stageId = 1, 0, taskSetBlacklist1.execToFailures)
+
+    // Make sure nothing was blacklisted
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set())
+  }
+
+  test("only blacklist nodes for the application when enough executors have failed on that " +
+    "specific host") {
+    // we blacklist executors on two different hosts -- make sure that doesn't lead to any
+    // node blacklisting
+    val taskSetBlacklist0 = createTaskSetBlacklist(stageId = 0)
+    taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 0)
+    taskSetBlacklist0.updateBlacklistForFailedTask("hostA", exec = "1", index = 1)
+    blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist0.execToFailures)
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1"))
+    assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+
+    val taskSetBlacklist1 = createTaskSetBlacklist(stageId = 1)
+    taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 0)
+    taskSetBlacklist1.updateBlacklistForFailedTask("hostB", exec = "2", index = 1)
+    blacklist.updateBlacklistForSuccessfulTaskSet(1, 0, taskSetBlacklist1.execToFailures)
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2"))
+    assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set())
+
+    // Finally, blacklist another executor on the same node as the original blacklisted executor,
+    // and make sure this time we *do* blacklist the node.
+    val taskSetBlacklist2 = createTaskSetBlacklist(stageId = 0)
+    taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 0)
+    taskSetBlacklist2.updateBlacklistForFailedTask("hostA", exec = "3", index = 1)
+    blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
+    assertEquivalentToSet(blacklist.isExecutorBlacklisted(_), Set("1", "2", "3"))
+    assertEquivalentToSet(blacklist.isNodeBlacklisted(_), Set("hostA"))
+  }
 
   test("blacklist still respects legacy configs") {
     val conf = new SparkConf().setMaster("local")
@@ -68,6 +414,8 @@ class BlacklistTrackerSuite extends SparkFunSuite {
       config.MAX_TASK_ATTEMPTS_PER_NODE,
       config.MAX_FAILURES_PER_EXEC_STAGE,
       config.MAX_FAILED_EXEC_PER_NODE_STAGE,
+      config.MAX_FAILURES_PER_EXEC,
+      config.MAX_FAILED_EXEC_PER_NODE,
       config.BLACKLIST_TIMEOUT_CONF
     ).foreach { config =>
       conf.set(config.key, "0")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index a0b62683312ae4f9012a16026546a6e170bd7d6b..304dc9d47e268ccd7d13fa6411e9c250d485d05c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -21,14 +21,15 @@ import java.nio.ByteBuffer
 
 import scala.collection.mutable.HashMap
 
-import org.mockito.Matchers.{anyInt, anyString, eq => meq}
-import org.mockito.Mockito.{atLeast, atMost, never, spy, verify, when}
+import org.mockito.Matchers.{anyInt, anyObject, anyString, eq => meq}
+import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when}
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.internal.config
 import org.apache.spark.internal.Logging
+import org.apache.spark.storage.BlockManagerId
 
 class FakeSchedulerBackend extends SchedulerBackend {
   def start() {}
@@ -44,6 +45,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
   var failedTaskSetReason: String = null
   var failedTaskSet = false
 
+  var blacklist: BlacklistTracker = null
   var taskScheduler: TaskSchedulerImpl = null
   var dagScheduler: DAGScheduler = null
 
@@ -82,11 +84,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
   }
 
   def setupSchedulerWithMockTaskSetBlacklist(): TaskSchedulerImpl = {
+    blacklist = mock[BlacklistTracker]
     val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
     conf.set(config.BLACKLIST_ENABLED, true)
     sc = new SparkContext(conf)
     taskScheduler =
-      new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
+      new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4), Some(blacklist)) {
         override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
           val tsm = super.createTaskSetManager(taskSet, maxFailures)
           // we need to create a spied tsm just so we can set the TaskSetBlacklist
@@ -408,6 +411,95 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
       }
       assert(tsm.isZombie)
     }
+
+    // the tasksSets complete, so the tracker should be notified of the successful ones
+    verify(blacklist, times(1)).updateBlacklistForSuccessfulTaskSet(
+      stageId = 0,
+      stageAttemptId = 0,
+      failuresByExec = stageToMockTaskSetBlacklist(0).execToFailures)
+    verify(blacklist, times(1)).updateBlacklistForSuccessfulTaskSet(
+      stageId = 1,
+      stageAttemptId = 0,
+      failuresByExec = stageToMockTaskSetBlacklist(1).execToFailures)
+    // but we shouldn't update for the failed taskset
+    verify(blacklist, never).updateBlacklistForSuccessfulTaskSet(
+      stageId = meq(2),
+      stageAttemptId = anyInt(),
+      failuresByExec = anyObject())
+  }
+
+  test("scheduled tasks obey node and executor blacklists") {
+    taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+    (0 to 2).foreach { stageId =>
+      val taskSet = FakeTask.createTaskSet(numTasks = 2, stageId = stageId, stageAttemptId = 0)
+      taskScheduler.submitTasks(taskSet)
+    }
+
+    val offers = IndexedSeq(
+      new WorkerOffer("executor0", "host0", 1),
+      new WorkerOffer("executor1", "host1", 1),
+      new WorkerOffer("executor2", "host1", 1),
+      new WorkerOffer("executor3", "host2", 10),
+      new WorkerOffer("executor4", "host3", 1)
+    )
+
+    // setup our mock blacklist:
+    // host1, executor0 & executor3 are completely blacklisted
+    // This covers everything *except* one core on executor4 / host3, so that everything is still
+    // schedulable.
+    when(blacklist.isNodeBlacklisted("host1")).thenReturn(true)
+    when(blacklist.isExecutorBlacklisted("executor0")).thenReturn(true)
+    when(blacklist.isExecutorBlacklisted("executor3")).thenReturn(true)
+
+    val stageToTsm = (0 to 2).map { stageId =>
+      val tsm = taskScheduler.taskSetManagerForAttempt(stageId, 0).get
+      stageId -> tsm
+    }.toMap
+
+    val firstTaskAttempts = taskScheduler.resourceOffers(offers).flatten
+    firstTaskAttempts.foreach { task => logInfo(s"scheduled $task on ${task.executorId}") }
+    assert(firstTaskAttempts.size === 1)
+    assert(firstTaskAttempts.head.executorId === "executor4")
+    ('0' until '2').foreach { hostNum =>
+      verify(blacklist, atLeast(1)).isNodeBlacklisted("host" + hostNum)
+    }
+  }
+
+  test("abort stage when all executors are blacklisted") {
+    taskScheduler = setupSchedulerWithMockTaskSetBlacklist()
+    val taskSet = FakeTask.createTaskSet(numTasks = 10, stageAttemptId = 0)
+    taskScheduler.submitTasks(taskSet)
+    val tsm = stageToMockTaskSetManager(0)
+
+    // first just submit some offers so the scheduler knows about all the executors
+    taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 2),
+      WorkerOffer("executor1", "host0", 2),
+      WorkerOffer("executor2", "host0", 2),
+      WorkerOffer("executor3", "host1", 2)
+    ))
+
+    // now say our blacklist updates to blacklist a bunch of resources, but *not* everything
+    when(blacklist.isNodeBlacklisted("host1")).thenReturn(true)
+    when(blacklist.isExecutorBlacklisted("executor0")).thenReturn(true)
+
+    // make an offer on the blacklisted resources.  We won't schedule anything, but also won't
+    // abort yet, since we know of other resources that work
+    assert(taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 2),
+      WorkerOffer("executor3", "host1", 2)
+    )).flatten.size === 0)
+    assert(!tsm.isZombie)
+
+    // now update the blacklist so that everything really is blacklisted
+    when(blacklist.isExecutorBlacklisted("executor1")).thenReturn(true)
+    when(blacklist.isExecutorBlacklisted("executor2")).thenReturn(true)
+    assert(taskScheduler.resourceOffers(IndexedSeq(
+      WorkerOffer("executor0", "host0", 2),
+      WorkerOffer("executor3", "host1", 2)
+    )).flatten.size === 0)
+    assert(tsm.isZombie)
+    verify(tsm).abort(anyString(), anyObject())
   }
 
   /**
@@ -650,6 +742,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
   }
 
+  test("scheduler checks for executors that can be expired from blacklist") {
+    taskScheduler = setupScheduler()
+
+    taskScheduler.submitTasks(FakeTask.createTaskSet(1, 0))
+    taskScheduler.resourceOffers(IndexedSeq(
+      new WorkerOffer("executor0", "host0", 1)
+    )).flatten
+
+    verify(blacklist).applyBlacklistTimeout()
+  }
+
   test("if an executor is lost then the state for its running tasks is cleaned up (SPARK-18553)") {
     sc = new SparkContext("local", "TaskSchedulerImplSuite")
     val taskScheduler = new TaskSchedulerImpl(sc)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
index 8c902af5685ffd758839327f2af265f8c8c48e08..6b52c10b2c68b0567604967d1d1aa3b91e721a5a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetBlacklistSuite.scala
@@ -85,9 +85,9 @@ class TaskSetBlacklistSuite extends SparkFunSuite {
 
     Seq("exec1", "exec2").foreach { exec =>
       assert(
-        execToFailures(exec).taskToFailureCount === Map(
-          0 -> 1,
-          1 -> 1
+        execToFailures(exec).taskToFailureCountAndFailureTime === Map(
+          0 -> (1, 0),
+          1 -> (1, 0)
         )
       )
     }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index abc8fff30edc8d5cd6259b2afc435945c0d3dc11..2f5b029a966f122e67fca82f6304fb5cabaa7e0e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -183,7 +183,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
     val accumUpdates = taskSet.tasks.head.metrics.internalAccums
 
     // Offer a host with NO_PREF as the constraint,
@@ -236,7 +236,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
     val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     // An executor that is not NODE_LOCAL should be rejected.
     assert(manager.resourceOffer("execC", "host2", ANY) === None)
@@ -257,7 +257,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq()   // Last task has no locality prefs
     )
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
     // First offer host1, exec1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
     assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None)
@@ -286,7 +286,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq()   // Last task has no locality prefs
     )
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
     // First offer host1, exec1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0)
     assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1)
@@ -306,7 +306,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(TaskLocation("host2"))
     )
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     // First offer host1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -344,7 +344,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(TaskLocation("host3"))
     )
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     // First offer host1: first task should be chosen
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
@@ -376,7 +376,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
 
@@ -393,7 +393,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
     // after the last failure.
@@ -426,7 +426,10 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     // affinity to exec1 on host1 - which we will fail.
     val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, 4, clock)
+    // We don't directly use the application blacklist, but its presence triggers blacklisting
+    // within the taskset.
+    val blacklistTrackerOpt = Some(new BlacklistTracker(conf, clock))
+    val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock)
 
     {
       val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
@@ -515,7 +518,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(TaskLocation("host2", "execC")),
       Seq())
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
     // Only ANY is valid
     assert(manager.myLocalityLevels.sameElements(Array(NO_PREF, ANY)))
     // Add a new executor
@@ -546,7 +549,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(TaskLocation("host1", "execB")),
       Seq(TaskLocation("host2", "execC")),
       Seq())
-    val manager = new TaskSetManager(sched, taskSet, 1, new ManualClock)
+    val manager = new TaskSetManager(sched, taskSet, 1, clock = new ManualClock)
     sched.addExecutor("execA", "host1")
     manager.executorAdded()
     sched.addExecutor("execC", "host2")
@@ -579,7 +582,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(TaskLocation("host1", "execA")),
       Seq(TaskLocation("host1", "execA")))
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
     // Set allowed locality to ANY
@@ -670,7 +673,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(),
       Seq(TaskLocation("host3", "execC")))
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0)
     assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None)
@@ -698,7 +701,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(),
       Seq(TaskLocation("host3")))
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     // node-local tasks are scheduled without delay
     assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0)
@@ -720,7 +723,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(ExecutorCacheTaskLocation("host1", "execA")),
       Seq(ExecutorCacheTaskLocation("host2", "execB")))
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     // process-local tasks are scheduled first
     assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2)
@@ -740,7 +743,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(ExecutorCacheTaskLocation("host1", "execA")),
       Seq(ExecutorCacheTaskLocation("host2", "execB")))
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
 
     // process-local tasks are scheduled first
     assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1)
@@ -760,7 +763,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(TaskLocation("host1", "execA")),
       Seq(TaskLocation("host2", "execB.1")))
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
     // Only ANY is valid
     assert(manager.myLocalityLevels.sameElements(Array(ANY)))
     // Add a new executor
@@ -794,7 +797,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       Seq(TaskLocation("host2")),
       Seq(TaskLocation("hdfs_cache_host3")))
     val clock = new ManualClock
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
     assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
     sched.removeExecutor("execA")
     manager.executorAdded()
@@ -822,7 +825,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
     sc.conf.set("spark.speculation.multiplier", "0.0")
     val clock = new ManualClock()
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
     val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
       task.metrics.internalAccums
     }
@@ -876,7 +879,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     sc.conf.set("spark.speculation.multiplier", "0.0")
     sc.conf.set("spark.speculation.quantile", "0.6")
     val clock = new ManualClock()
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
     val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
       task.metrics.internalAccums
     }
@@ -980,17 +983,17 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     sc = new SparkContext("local", "test")
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 0)
-    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, new ManualClock)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock)
     assert(manager.name === "TaskSet_0.0")
 
     // Make sure a task set with the same stage ID but different attempt ID has a unique name
     val taskSet2 = FakeTask.createTaskSet(numTasks = 1, stageId = 0, stageAttemptId = 1)
-    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, new ManualClock)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = new ManualClock)
     assert(manager2.name === "TaskSet_0.1")
 
     // Make sure a task set with the same attempt ID but different stage ID also has a unique name
     val taskSet3 = FakeTask.createTaskSet(numTasks = 1, stageId = 1, stageAttemptId = 1)
-    val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, new ManualClock)
+    val manager3 = new TaskSetManager(sched, taskSet3, MAX_TASK_FAILURES, clock = new ManualClock)
     assert(manager3.name === "TaskSet_1.1")
   }
 
diff --git a/docs/configuration.md b/docs/configuration.md
index 7e466d7dc15ed58015cd690d2fc199746909a549..07bcd4aa7f3047a877f93f9a9c12ab38176499df 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1315,6 +1315,14 @@ Apart from these, the following properties are also available, and may be useful
     other "spark.blacklist" configuration options.
   </td>
 </tr>
+<tr>
+  <td><code>spark.blacklist.timeout</code></td>
+  <td>1h</td>
+  <td>
+    (Experimental) How long a node or executor is blacklisted for the entire application, before it
+    is unconditionally removed from the blacklist to attempt running new tasks.
+  </td>
+</tr>
 <tr>
   <td><code>spark.blacklist.task.maxTaskAttemptsPerExecutor</code></td>
   <td>1</td>
@@ -1347,6 +1355,28 @@ Apart from these, the following properties are also available, and may be useful
     the entire node is marked as failed for the stage.
   </td>
 </tr>
+<tr>
+  <td><code>spark.blacklist.application.maxFailedTasksPerExecutor</code></td>
+  <td>2</td>
+  <td>
+    (Experimental) How many different tasks must fail on one executor, in successful task sets,
+    before the executor is blacklisted for the entire application.  Blacklisted executors will
+    be automatically added back to the pool of available resources after the timeout specified by
+    <code>spark.blacklist.timeout</code>.  Note that with dynamic allocation, though, the executors
+    may get marked as idle and be reclaimed by the cluster manager.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.blacklist.application.maxFailedExecutorsPerNode</code></td>
+  <td>2</td>
+  <td>
+    (Experimental) How many different executors must be blacklisted for the entire application,
+    before the node is blacklisted for the entire application.  Blacklisted nodes will
+    be automatically added back to the pool of available resources after the timeout specified by
+    <code>spark.blacklist.timeout</code>.  Note that with dynamic allocation, though, the executors
+    on the node may get marked as idle and be reclaimed by the cluster manager.
+  </td>
+</tr>
 <tr>
   <td><code>spark.speculation</code></td>
   <td>false</td>
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 0378ef4fac7c502507475c02a27009ce899c05e6..f79c66b9ff8a93b156adf5283f418411cb68504f 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -692,11 +692,11 @@ private[spark] class ApplicationMaster(
     }
 
     override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
-      case RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount) =>
+      case r: RequestExecutors =>
         Option(allocator) match {
           case Some(a) =>
-            if (a.requestTotalExecutorsWithPreferredLocalities(requestedTotal,
-              localityAwareTasks, hostToLocalTaskCount)) {
+            if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
+              r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
               resetAllocatorInterval()
             }
             context.reply(true)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 0b66d1cf08eac468c68a23f52fb5b7d2d7b3b01a..e498932e51ed5b8ea80a48e019ce8e754db08d40 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -114,6 +114,8 @@ private[yarn] class YarnAllocator(
   @volatile private var targetNumExecutors =
     YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
 
+  private var currentNodeBlacklist = Set.empty[String]
+
   // Executor loss reason requests that are pending - maps from executor ID for inquiry to a
   // list of requesters that should be responded to once we find out why the given executor
   // was lost.
@@ -217,18 +219,35 @@ private[yarn] class YarnAllocator(
    * @param localityAwareTasks number of locality aware tasks to be used as container placement hint
    * @param hostToLocalTaskCount a map of preferred hostname to possible task counts to be used as
    *                             container placement hint.
+   * @param nodeBlacklist a set of blacklisted nodes, which is passed in to avoid allocating new
+    *                      containers on them. It will be used to update the application master's
+    *                      blacklist.
    * @return Whether the new requested total is different than the old value.
    */
   def requestTotalExecutorsWithPreferredLocalities(
       requestedTotal: Int,
       localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int]): Boolean = synchronized {
+      hostToLocalTaskCount: Map[String, Int],
+      nodeBlacklist: Set[String]): Boolean = synchronized {
     this.numLocalityAwareTasks = localityAwareTasks
     this.hostToLocalTaskCounts = hostToLocalTaskCount
 
     if (requestedTotal != targetNumExecutors) {
       logInfo(s"Driver requested a total number of $requestedTotal executor(s).")
       targetNumExecutors = requestedTotal
+
+      // Update blacklist infomation to YARN ResouceManager for this application,
+      // in order to avoid allocating new Containers on the problematic nodes.
+      val blacklistAdditions = nodeBlacklist -- currentNodeBlacklist
+      val blacklistRemovals = currentNodeBlacklist -- nodeBlacklist
+      if (blacklistAdditions.nonEmpty) {
+        logInfo(s"adding nodes to YARN application master's blacklist: $blacklistAdditions")
+      }
+      if (blacklistRemovals.nonEmpty) {
+        logInfo(s"removing nodes from YARN application master's blacklist: $blacklistRemovals")
+      }
+      amClient.updateBlacklist(blacklistAdditions.toList.asJava, blacklistRemovals.toList.asJava)
+      currentNodeBlacklist = nodeBlacklist
       true
     } else {
       false
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 2f9ea1911fd616a11d06a89f2f6576ef8c31a9db..cbc6e60e839c1fb181929a9483d7596245e8b91b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -121,13 +121,21 @@ private[spark] abstract class YarnSchedulerBackend(
     }
   }
 
+  private[cluster] def prepareRequestExecutors(requestedTotal: Int): RequestExecutors = {
+    val nodeBlacklist: Set[String] = scheduler.nodeBlacklist()
+    // For locality preferences, ignore preferences for nodes that are blacklisted
+    val filteredHostToLocalTaskCount =
+      hostToLocalTaskCount.filter { case (k, v) => !nodeBlacklist.contains(k) }
+    RequestExecutors(requestedTotal, localityAwareTasks, filteredHostToLocalTaskCount,
+      nodeBlacklist)
+  }
+
   /**
    * Request executors from the ApplicationMaster by specifying the total number desired.
    * This includes executors already pending or running.
    */
   override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
-    yarnSchedulerEndpointRef.ask[Boolean](
-      RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
+    yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
   }
 
   /**
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 994dc75d34c304c9e31c54f89f26343a9c2c2026..331bad4fd875d64188e60d2407c173e6a58f7d0d 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.yarn
 
 import java.util.{Arrays, List => JList}
 
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic
 import org.apache.hadoop.net.DNSToSwitchMapping
 import org.apache.hadoop.yarn.api.records._
@@ -90,7 +92,9 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     override def equals(other: Any): Boolean = false
   }
 
-  def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
+  def createAllocator(
+      maxExecutors: Int = 5,
+      rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = {
     val args = Array(
       "--jar", "somejar.jar",
       "--class", "SomeClass")
@@ -202,7 +206,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.getNumExecutorsRunning should be (0)
     handler.getPendingAllocate.size should be (4)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
+    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty)
     handler.updateResourceRequests()
     handler.getPendingAllocate.size should be (3)
 
@@ -213,7 +217,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
     handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty)
+    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map.empty, Set.empty)
     handler.updateResourceRequests()
     handler.getPendingAllocate.size should be (1)
   }
@@ -224,7 +228,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.getNumExecutorsRunning should be (0)
     handler.getPendingAllocate.size should be (4)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty)
+    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map.empty, Set.empty)
     handler.updateResourceRequests()
     handler.getPendingAllocate.size should be (3)
 
@@ -234,7 +238,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
 
     handler.getNumExecutorsRunning should be (2)
 
-    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
+    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty)
     handler.updateResourceRequests()
     handler.getPendingAllocate.size should be (0)
     handler.getNumExecutorsRunning should be (2)
@@ -250,7 +254,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     val container2 = createContainer("host2")
     handler.handleAllocatedContainers(Array(container1, container2))
 
-    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty)
+    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map.empty, Set.empty)
     handler.executorIdToContainer.keys.foreach { id => handler.killExecutor(id ) }
 
     val statuses = Seq(container1, container2).map { c =>
@@ -272,7 +276,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     val container2 = createContainer("host2")
     handler.handleAllocatedContainers(Array(container1, container2))
 
-    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map())
+    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set.empty)
 
     val statuses = Seq(container1, container2).map { c =>
       ContainerStatus.newInstance(c.getId(), ContainerState.COMPLETE, "Failed", -1)
@@ -286,6 +290,21 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
     handler.getNumUnexpectedContainerRelease should be (2)
   }
 
+  test("blacklisted nodes reflected in amClient requests") {
+    // Internally we track the set of blacklisted nodes, but yarn wants us to send *changes*
+    // to the blacklist.  This makes sure we are sending the right updates.
+    val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+    val handler = createAllocator(4, mockAmClient)
+    handler.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set("hostA"))
+    verify(mockAmClient).updateBlacklist(Seq("hostA").asJava, Seq().asJava)
+
+    handler.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set("hostA", "hostB"))
+    verify(mockAmClient).updateBlacklist(Seq("hostB").asJava, Seq().asJava)
+
+    handler.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set())
+    verify(mockAmClient).updateBlacklist(Seq().asJava, Seq("hostA", "hostB").asJava)
+  }
+
   test("memory exceeded diagnostic regexes") {
     val diagnostics =
       "Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ffa0b58ee715e739d0bd83c900a3af9da8b88281
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackendSuite.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster
+
+import org.mockito.Mockito.when
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.serializer.JavaSerializer
+
+class YarnSchedulerBackendSuite extends SparkFunSuite with MockitoSugar with LocalSparkContext {
+
+  test("RequestExecutors reflects node blacklist and is serializable") {
+    sc = new SparkContext("local", "YarnSchedulerBackendSuite")
+    val sched = mock[TaskSchedulerImpl]
+    when(sched.sc).thenReturn(sc)
+    val yarnSchedulerBackend = new YarnSchedulerBackend(sched, sc) {
+      def setHostToLocalTaskCount(hostToLocalTaskCount: Map[String, Int]): Unit = {
+        this.hostToLocalTaskCount = hostToLocalTaskCount
+      }
+    }
+    val ser = new JavaSerializer(sc.conf).newInstance()
+    for {
+      blacklist <- IndexedSeq(Set[String](), Set("a", "b", "c"))
+      numRequested <- 0 until 10
+      hostToLocalCount <- IndexedSeq(
+        Map[String, Int](),
+        Map("a" -> 1, "b" -> 2)
+      )
+    } {
+      yarnSchedulerBackend.setHostToLocalTaskCount(hostToLocalCount)
+      when(sched.nodeBlacklist()).thenReturn(blacklist)
+      val req = yarnSchedulerBackend.prepareRequestExecutors(numRequested)
+      assert(req.requestedTotal === numRequested)
+      assert(req.nodeBlacklist === blacklist)
+      assert(req.hostToLocalTaskCount.keySet.intersect(blacklist).isEmpty)
+      // Serialize to make sure serialization doesn't throw an error
+      ser.serialize(req)
+    }
+  }
+
+}