diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 462c1890fd8dfb2dc01ff01f8f627fb067c60930..be63c637a3a13a1cbfc47f4ca53adab47be9fb73 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -149,6 +149,11 @@ package object config {
+    ConfigBuilder("spark.blacklist.application.fetchFailure.enabled")
+      .booleanConf
+      .createWithDefault(false)
   // End blacklist confs
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index e130e609e4f6348396a555ad64d96e84e14eb5a5..cd8e61d6d02081b221ed4e8d0ba90bf49a2caea7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -61,6 +61,7 @@ private[scheduler] class BlacklistTracker (
   private val MAX_FAILURES_PER_EXEC = conf.get(config.MAX_FAILURES_PER_EXEC)
   private val MAX_FAILED_EXEC_PER_NODE = conf.get(config.MAX_FAILED_EXEC_PER_NODE)
   val BLACKLIST_TIMEOUT_MILLIS = BlacklistTracker.getBlacklistTimeout(conf)
    * A map from executorId to information on task failures.  Tracks the time of each task failure,
@@ -145,6 +146,74 @@ private[scheduler] class BlacklistTracker (
     nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
+  private def killBlacklistedExecutor(exec: String): Unit = {
+    if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+      allocationClient match {
+        case Some(a) =>
+          logInfo(s"Killing blacklisted executor id $exec " +
+            s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
+          a.killExecutors(Seq(exec), true, true)
+        case None =>
+          logWarning(s"Not attempting to kill blacklisted executor id $exec " +
+            s"since allocation client is not defined.")
+      }
+    }
+  }
+  private def killExecutorsOnBlacklistedNode(node: String): Unit = {
+    if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
+      allocationClient match {
+        case Some(a) =>
+          logInfo(s"Killing all executors on blacklisted host $node " +
+            s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
+          if (a.killExecutorsOnHost(node) == false) {
+            logError(s"Killing executors on node $node failed.")
+          }
+        case None =>
+          logWarning(s"Not attempting to kill executors on blacklisted host $node " +
+            s"since allocation client is not defined.")
+      }
+    }
+  }
+  def updateBlacklistForFetchFailure(host: String, exec: String): Unit = {
+      // If we blacklist on fetch failures, we are implicitly saying that we believe the failure is
+      // non-transient, and can't be recovered from (even if this is the first fetch failure,
+      // stage is retried after just one failure, so we don't always get a chance to collect
+      // multiple fetch failures).
+      // If the external shuffle-service is on, then every other executor on this node would
+      // be suffering from the same issue, so we should blacklist (and potentially kill) all
+      // of them immediately.
+      val now = clock.getTimeMillis()
+      val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
+      if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+        if (!nodeIdToBlacklistExpiryTime.contains(host)) {
+          logInfo(s"blacklisting node $host due to fetch failure of external shuffle service")
+          nodeIdToBlacklistExpiryTime.put(host, expiryTimeForNewBlacklists)
+          listenerBus.post(SparkListenerNodeBlacklisted(now, host, 1))
+          _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
+          killExecutorsOnBlacklistedNode(host)
+          updateNextExpiryTime()
+        }
+      } else if (!executorIdToBlacklistStatus.contains(exec)) {
+        logInfo(s"Blacklisting executor $exec due to fetch failure")
+        executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists))
+        // We hardcoded number of failure tasks to 1 for fetch failure, because there's no
+        // reattempt for such failure.
+        listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, 1))
+        updateNextExpiryTime()
+        killBlacklistedExecutor(exec)
+        val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(exec, HashSet[String]())
+        blacklistedExecsOnNode += exec
+      }
+    }
+  }
   def updateBlacklistForSuccessfulTaskSet(
       stageId: Int,
@@ -174,17 +243,7 @@ private[scheduler] class BlacklistTracker (
         listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
-        if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
-          allocationClient match {
-            case Some(allocationClient) =>
-              logInfo(s"Killing blacklisted executor id $exec " +
-                s"since spark.blacklist.killBlacklistedExecutors is set.")
-              allocationClient.killExecutors(Seq(exec), true, true)
-            case None =>
-              logWarning(s"Not attempting to kill blacklisted executor id $exec " +
-                s"since allocation client is not defined.")
-          }
-        }
+        killBlacklistedExecutor(exec)
         // In addition to blacklisting the executor, we also update the data for failures on the
         // node, and potentially put the entire node into a blacklist as well.
@@ -199,19 +258,7 @@ private[scheduler] class BlacklistTracker (
           nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
           listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
-          if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
-            allocationClient match {
-              case Some(allocationClient) =>
-                logInfo(s"Killing all executors on blacklisted host $node " +
-                  s"since spark.blacklist.killBlacklistedExecutors is set.")
-                if (allocationClient.killExecutorsOnHost(node) == false) {
-                  logError(s"Killing executors on node $node failed.")
-                }
-              case None =>
-                logWarning(s"Not attempting to kill executors on blacklisted host $node " +
-                  s"since allocation client is not defined.")
-            }
-          }
+          killExecutorsOnBlacklistedNode(node)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index bba0b294f1afb7b22b4a079743a9e72c6f5d1cf4..91ec172ffeda1f264637ce1177b6734df60a4ae9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -51,29 +51,21 @@ import org.apache.spark.util.{AccumulatorV2, ThreadUtils, Utils}
  * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
  * we are holding a lock on ourselves.
-private[spark] class TaskSchedulerImpl private[scheduler](
+private[spark] class TaskSchedulerImpl(
     val sc: SparkContext,
     val maxTaskFailures: Int,
-    private[scheduler] val blacklistTrackerOpt: Option[BlacklistTracker],
     isLocal: Boolean = false)
   extends TaskScheduler with Logging {
   import TaskSchedulerImpl._
   def this(sc: SparkContext) = {
-    this(
-      sc,
-      sc.conf.get(config.MAX_TASK_FAILURES),
-      TaskSchedulerImpl.maybeCreateBlacklistTracker(sc))
+    this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
-  def this(sc: SparkContext, maxTaskFailures: Int, isLocal: Boolean) = {
-    this(
-      sc,
-      maxTaskFailures,
-      TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
-      isLocal = isLocal)
-  }
+  // Lazily initializing blackListTrackOpt to avoid getting empty ExecutorAllocationClient,
+  // because ExecutorAllocationClient is created after this TaskSchedulerImpl.
+  private[scheduler] lazy val blacklistTrackerOpt = maybeCreateBlacklistTracker(sc)
   val conf = sc.conf
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a41b059fa7decff7ed278c64d383c999852d20e6..02d374dc37cd5f6e5ae022c8ac855d4a303a718e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -774,6 +774,12 @@ private[spark] class TaskSetManager(
           tasksSuccessful += 1
         isZombie = true
+        if (fetchFailed.bmAddress != null) {
+          blacklistTracker.foreach(_.updateBlacklistForFetchFailure(
+            fetchFailed.bmAddress.host, fetchFailed.bmAddress.executorId))
+        }
       case ef: ExceptionFailure =>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index 571c6bbb4585d811d2be09173b4ef4829303b848..7ff03c44b0611dbe70561bf0510a092283af7711 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -530,4 +530,59 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     verify(allocationClientMock).killExecutors(Seq("2"), true, true)
+  test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
+    val allocationClientMock = mock[ExecutorAllocationClient]
+    when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called"))
+    when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
+      // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
+      // is updated before we ask the executor allocation client to kill all the executors
+      // on a particular host.
+      override def answer(invocation: InvocationOnMock): Boolean = {
+        if (blacklist.nodeBlacklist.contains("hostA") == false) {
+          throw new IllegalStateException("hostA should be on the blacklist")
+        }
+        true
+      }
+    })
+    conf.set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
+    blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
+    // Disable auto-kill. Blacklist an executor and make sure killExecutors is not called.
+    conf.set(config.BLACKLIST_KILL_ENABLED, false)
+    blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")
+    verify(allocationClientMock, never).killExecutors(any(), any(), any())
+    verify(allocationClientMock, never).killExecutorsOnHost(any())
+    // Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
+    conf.set(config.BLACKLIST_KILL_ENABLED, true)
+    blacklist = new BlacklistTracker(listenerBusMock, conf, Some(allocationClientMock), clock)
+    clock.advance(1000)
+    blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")
+    verify(allocationClientMock).killExecutors(Seq("1"), true, true)
+    verify(allocationClientMock, never).killExecutorsOnHost(any())
+    assert(blacklist.executorIdToBlacklistStatus.contains("1"))
+    assert(blacklist.executorIdToBlacklistStatus("1").node === "hostA")
+    assert(blacklist.executorIdToBlacklistStatus("1").expiryTime ===
+      1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+    assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+    assert(blacklist.nodeIdToBlacklistExpiryTime.isEmpty)
+    // Enable external shuffle service to see if all the executors on this node will be killed.
+    conf.set(config.SHUFFLE_SERVICE_ENABLED, true)
+    clock.advance(1000)
+    blacklist.updateBlacklistForFetchFailure("hostA", exec = "2")
+    verify(allocationClientMock, never).killExecutors(Seq("2"), true, true)
+    verify(allocationClientMock).killExecutorsOnHost("hostA")
+    assert(blacklist.nodeIdToBlacklistExpiryTime.contains("hostA"))
+    assert(blacklist.nodeIdToBlacklistExpiryTime("hostA") ===
+      2000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+    assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
+  }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 8b9d45f734cda345176f98170ace26db7d61dce5..a00337776dadcdbd0b0b1cce74d3f9857c1293c1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -87,7 +87,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
     conf.set(config.BLACKLIST_ENABLED, true)
     sc = new SparkContext(conf)
     taskScheduler =
-      new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4), Some(blacklist)) {
+      new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
         override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = {
           val tsm = super.createTaskSetManager(taskSet, maxFailures)
           // we need to create a spied tsm just so we can set the TaskSetBlacklist
@@ -98,6 +98,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
           stageToMockTaskSetBlacklist(taskSet.stageId) = taskSetBlacklist
+        override private[scheduler] lazy val blacklistTrackerOpt = Some(blacklist)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index db14c9acfdce55eba3453a3c18c96135de7df634..80fb674725814d2b00a1d885aeab483e0d073b02 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1140,6 +1140,38 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
       .updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
+  test("update application blacklist for shuffle-fetch") {
+    // Setup a taskset, and fail some one task for fetch failure.
+    val conf = new SparkConf()
+      .set(config.BLACKLIST_ENABLED, true)
+      .set(config.SHUFFLE_SERVICE_ENABLED, true)
+      .set(config.BLACKLIST_FETCH_FAILURE_ENABLED, true)
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = FakeTask.createTaskSet(4)
+    val blacklistTracker = new BlacklistTracker(sc, None)
+    val tsm = new TaskSetManager(sched, taskSet, 4, Some(blacklistTracker))
+    // make some offers to our taskset, to get tasks we will fail
+    val taskDescs = Seq(
+      "exec1" -> "host1",
+      "exec2" -> "host2"
+    ).flatMap { case (exec, host) =>
+      // offer each executor twice (simulating 2 cores per executor)
+      (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)}
+    }
+    assert(taskDescs.size === 4)
+    assert(!blacklistTracker.isExecutorBlacklisted(taskDescs(0).executorId))
+    assert(!blacklistTracker.isNodeBlacklisted("host1"))
+    // Fail the task with fetch failure
+    tsm.handleFailedTask(taskDescs(0).taskId, TaskState.FAILED,
+      FetchFailed(BlockManagerId(taskDescs(0).executorId, "host1", 12345), 0, 0, 0, "ignored"))
+    assert(blacklistTracker.isNodeBlacklisted("host1"))
+  }
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
diff --git a/docs/configuration.md b/docs/configuration.md
index f4bec589208be6b6d76fe1987c24d7258a15040a..c8e61537a457c59ac0be10bef194a4ef3d39614f 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1479,6 +1479,15 @@ Apart from these, the following properties are also available, and may be useful
     all of the executors on that node will be killed.
+  <td><code>spark.blacklist.application.fetchFailure.enabled</code></td>
+  <td>false</td>
+  <td>
+    (Experimental) If set to "true", Spark will blacklist the executor immediately when a fetch 
+    failure happenes. If external shuffle service is enabled, then the whole node will be 
+    blacklisted.
+  </td>