diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 9112d93a86b2a34e4c58901cd8c1a944a3b98fda..63d87b4cd385c79ee9ac93ceff8aed985665f95b 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
   /**
    * Request that the cluster manager kill the specified executors.
    *
-   * When asking the executor to be replaced, the executor loss is considered a failure, and
-   * killed tasks that are running on the executor will count towards the failure limits. If no
-   * replacement is being requested, then the tasks will not count towards the limit.
-   *
    * @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, default false
+   * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
+   *                                 after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when they are killed, whether
+    *                     to count those failures toward task failure limits
    * @param force whether to force kill busy executors, default false
    * @return the ids of the executors acknowledged by the cluster manager to be removed.
    */
   def killExecutors(
     executorIds: Seq[String],
-    replace: Boolean = false,
+    adjustTargetNumExecutors: Boolean,
+    countFailures: Boolean,
     force: Boolean = false): Seq[String]
 
   /**
@@ -81,7 +81,8 @@ private[spark] trait ExecutorAllocationClient {
    * @return whether the request is acknowledged by the cluster manager.
    */
   def killExecutor(executorId: String): Boolean = {
-    val killedExecutors = killExecutors(Seq(executorId))
+    val killedExecutors = killExecutors(Seq(executorId), adjustTargetNumExecutors = true,
+      countFailures = false)
     killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 6c59038f2a6c146d026d37f46149908b9c133019..189d91333c045b187ec5a54bbea092b3be8c1a2a 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.scheduler._
+import org.apache.spark.storage.BlockManagerMaster
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
 /**
@@ -81,7 +82,8 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 private[spark] class ExecutorAllocationManager(
     client: ExecutorAllocationClient,
     listenerBus: LiveListenerBus,
-    conf: SparkConf)
+    conf: SparkConf,
+    blockManagerMaster: BlockManagerMaster)
   extends Logging {
 
   allocationManager =>
@@ -151,7 +153,7 @@ private[spark] class ExecutorAllocationManager(
   private var clock: Clock = new SystemClock()
 
   // Listener for Spark events that impact the allocation policy
-  private val listener = new ExecutorAllocationListener
+  val listener = new ExecutorAllocationListener
 
   // Executor that handles the scheduling task.
   private val executor =
@@ -334,6 +336,11 @@ private[spark] class ExecutorAllocationManager(
 
       // If the new target has not changed, avoid sending a message to the cluster manager
       if (numExecutorsTarget < oldNumExecutorsTarget) {
+        // We lower the target number of executors but don't actively kill any yet.  Killing is
+        // controlled separately by an idle timeout.  It's still helpful to reduce the target number
+        // in case an executor just happens to get lost (eg., bad hardware, or the cluster manager
+        // preempts it) -- in that case, there is no point in trying to immediately  get a new
+        // executor, since we wouldn't even use it yet.
         client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
         logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
           s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
@@ -455,7 +462,10 @@ private[spark] class ExecutorAllocationManager(
     val executorsRemoved = if (testing) {
       executorIdsToBeRemoved
     } else {
-      client.killExecutors(executorIdsToBeRemoved)
+      // We don't want to change our target number of executors, because we already did that
+      // when the task backlog decreased.
+      client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false,
+        countFailures = false, force = false)
     }
     // [SPARK-21834] killExecutors api reduces the target number of executors.
     // So we need to update the target with desired value.
@@ -575,7 +585,7 @@ private[spark] class ExecutorAllocationManager(
         // Note that it is not necessary to query the executors since all the cached
         // blocks we are concerned with are reported to the driver. Note that this
         // does not include broadcast blocks.
-        val hasCachedBlocks = SparkEnv.get.blockManager.master.hasCachedBlocks(executorId)
+        val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId)
         val now = clock.getTimeMillis()
         val timeout = {
           if (hasCachedBlocks) {
@@ -610,7 +620,7 @@ private[spark] class ExecutorAllocationManager(
    * This class is intentionally conservative in its assumptions about the relative ordering
    * and consistency of events returned by the listener.
    */
-  private class ExecutorAllocationListener extends SparkListener {
+  private[spark] class ExecutorAllocationListener extends SparkListener {
 
     private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
     // Number of running tasks per stage including speculative tasks.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index dc531e333701491f81ac221c2e20753ccd569337..5e8595603cc9088c25cb0a82b9c071018c697645 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -534,7 +534,8 @@ class SparkContext(config: SparkConf) extends Logging {
         schedulerBackend match {
           case b: ExecutorAllocationClient =>
             Some(new ExecutorAllocationManager(
-              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
+              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
+              _env.blockManager.master))
           case _ =>
             None
         }
@@ -1633,6 +1634,8 @@ class SparkContext(config: SparkConf) extends Logging {
    * :: DeveloperApi ::
    * Request that the cluster manager kill the specified executors.
    *
+   * This is not supported when dynamic allocation is turned on.
+   *
    * @note This is an indication to the cluster manager that the application wishes to adjust
    * its resource usage downwards. If the application wishes to replace the executors it kills
    * through this method with new ones, it should follow up explicitly with a call to
@@ -1644,7 +1647,10 @@ class SparkContext(config: SparkConf) extends Logging {
   def killExecutors(executorIds: Seq[String]): Boolean = {
     schedulerBackend match {
       case b: ExecutorAllocationClient =>
-        b.killExecutors(executorIds, replace = false, force = true).nonEmpty
+        require(executorAllocationManager.isEmpty,
+          "killExecutors() unsupported with Dynamic Allocation turned on")
+        b.killExecutors(executorIds, adjustTargetNumExecutors = true, countFailures = false,
+          force = true).nonEmpty
       case _ =>
         logWarning("Killing executors is not supported by current scheduler.")
         false
@@ -1682,7 +1688,8 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] def killAndReplaceExecutor(executorId: String): Boolean = {
     schedulerBackend match {
       case b: ExecutorAllocationClient =>
-        b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty
+        b.killExecutors(Seq(executorId), adjustTargetNumExecutors = false, countFailures = true,
+          force = true).nonEmpty
       case _ =>
         logWarning("Killing executors is not supported by current scheduler.")
         false
diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index cd8e61d6d02081b221ed4e8d0ba90bf49a2caea7..952598f6de19d4d3aa23b8fb3c5556c0e113752f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -152,7 +152,8 @@ private[scheduler] class BlacklistTracker (
         case Some(a) =>
           logInfo(s"Killing blacklisted executor id $exec " +
             s"since ${config.BLACKLIST_KILL_ENABLED.key} is set.")
-          a.killExecutors(Seq(exec), true, true)
+          a.killExecutors(Seq(exec), adjustTargetNumExecutors = false, countFailures = false,
+            force = true)
         case None =>
           logWarning(s"Not attempting to kill blacklisted executor id $exec " +
             s"since allocation client is not defined.")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4d75063fbf1c56b35d4616f779c8e3b255b83091..5627a557a12f3c6be7bd41f8c0904f1c0e80ed07 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -147,7 +147,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
       case KillExecutorsOnHost(host) =>
         scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
-          killExecutors(exec.toSeq, replace = true, force = true)
+          killExecutors(exec.toSeq, adjustTargetNumExecutors = false, countFailures = false,
+            force = true)
         }
 
       case UpdateDelegationTokens(newDelegationTokens) =>
@@ -584,18 +585,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   /**
    * Request that the cluster manager kill the specified executors.
    *
-   * When asking the executor to be replaced, the executor loss is considered a failure, and
-   * killed tasks that are running on the executor will count towards the failure limits. If no
-   * replacement is being requested, then the tasks will not count towards the limit.
-   *
    * @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, default false
+   * @param adjustTargetNumExecutors whether the target number of executors be adjusted down
+   *                                 after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when they are killed, whether
+   *                      those failures be counted to task failure limits?
    * @param force whether to force kill busy executors, default false
    * @return the ids of the executors acknowledged by the cluster manager to be removed.
    */
   final override def killExecutors(
       executorIds: Seq[String],
-      replace: Boolean,
+      adjustTargetNumExecutors: Boolean,
+      countFailures: Boolean,
       force: Boolean): Seq[String] = {
     logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
 
@@ -610,7 +611,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       val executorsToKill = knownExecutors
         .filter { id => !executorsPendingToRemove.contains(id) }
         .filter { id => force || !scheduler.isExecutorBusy(id) }
-      executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace }
+      executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures }
 
       logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}")
 
@@ -618,12 +619,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       // with the cluster manager to avoid allocating new ones. When computing the new target,
       // take into account executors that are pending to be added or removed.
       val adjustTotalExecutors =
-        if (!replace) {
+        if (adjustTargetNumExecutors) {
           requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
           if (requestedTotalExecutors !=
               (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
             logDebug(
-              s"""killExecutors($executorIds, $replace, $force): Executor counts do not match:
+              s"""killExecutors($executorIds, $adjustTargetNumExecutors, $countFailures, $force):
+                 |Executor counts do not match:
                  |requestedTotalExecutors  = $requestedTotalExecutors
                  |numExistingExecutors     = $numExistingExecutors
                  |numPendingExecutors      = $numPendingExecutors
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index a0cae5a9e011cf23cf0d277edab6fd36f216e2a4..9807d1269e3d46afb9d42bd46efc33941694f8a3 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark
 
 import scala.collection.mutable
 
+import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito.{mock, never, verify, when}
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.executor.TaskMetrics
@@ -26,6 +28,7 @@ import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ExternalClusterManager
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.apache.spark.storage.BlockManagerMaster
 import org.apache.spark.util.ManualClock
 
 /**
@@ -1050,6 +1053,66 @@ class ExecutorAllocationManagerSuite
     assert(removeTimes(manager) === Map.empty)
   }
 
+  test("SPARK-23365 Don't update target num executors when killing idle executors") {
+    val minExecutors = 1
+    val initialExecutors = 1
+    val maxExecutors = 2
+    val conf = new SparkConf()
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.shuffle.service.enabled", "true")
+      .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
+      .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
+      .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString)
+      .set("spark.dynamicAllocation.schedulerBacklogTimeout", "1000ms")
+      .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "1000ms")
+      .set("spark.dynamicAllocation.executorIdleTimeout", s"3000ms")
+    val mockAllocationClient = mock(classOf[ExecutorAllocationClient])
+    val mockBMM = mock(classOf[BlockManagerMaster])
+    val manager = new ExecutorAllocationManager(
+      mockAllocationClient, mock(classOf[LiveListenerBus]), conf, mockBMM)
+    val clock = new ManualClock()
+    manager.setClock(clock)
+
+    when(mockAllocationClient.requestTotalExecutors(meq(2), any(), any())).thenReturn(true)
+    // test setup -- job with 2 tasks, scale up to two executors
+    assert(numExecutorsTarget(manager) === 1)
+    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+      clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
+    manager.listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 2)))
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
+    assert(numExecutorsTarget(manager) === 2)
+    val taskInfo0 = createTaskInfo(0, 0, "executor-1")
+    manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo0))
+    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+      clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 1, Map.empty)))
+    val taskInfo1 = createTaskInfo(1, 1, "executor-2")
+    manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo1))
+    assert(numExecutorsTarget(manager) === 2)
+
+    // have one task finish -- we should adjust the target number of executors down
+    // but we should *not* kill any executors yet
+    manager.listener.onTaskEnd(SparkListenerTaskEnd(0, 0, null, Success, taskInfo0, null))
+    assert(maxNumExecutorsNeeded(manager) === 1)
+    assert(numExecutorsTarget(manager) === 2)
+    clock.advance(1000)
+    manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
+    assert(numExecutorsTarget(manager) === 1)
+    verify(mockAllocationClient, never).killExecutors(any(), any(), any(), any())
+
+    // now we cross the idle timeout for executor-1, so we kill it.  the really important
+    // thing here is that we do *not* ask the executor allocation client to adjust the target
+    // number of executors down
+    when(mockAllocationClient.killExecutors(Seq("executor-1"), false, false, false))
+      .thenReturn(Seq("executor-1"))
+    clock.advance(3000)
+    schedule(manager)
+    assert(maxNumExecutorsNeeded(manager) === 1)
+    assert(numExecutorsTarget(manager) === 1)
+    // here's the important verify -- we did kill the executors, but did not adjust the target count
+    verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false)
+  }
+
   private def createSparkContext(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,
@@ -1268,7 +1331,8 @@ private class DummyLocalSchedulerBackend (sc: SparkContext, sb: SchedulerBackend
 
   override def killExecutors(
       executorIds: Seq[String],
-      replace: Boolean,
+      adjustTargetNumExecutors: Boolean,
+      countFailures: Boolean,
       force: Boolean): Seq[String] = executorIds
 
   override def start(): Unit = sb.start()
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index c21ee7d26f8ca290734633ba87747eb92b742c25..27cc47496c8059e3c512cd2b4658faf6744d54e6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -573,7 +573,8 @@ class StandaloneDynamicAllocationSuite
     syncExecutors(sc)
     sc.schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutors(Seq(executorId), replace = false, force)
+        b.killExecutors(Seq(executorId), adjustTargetNumExecutors = true, countFailures = false,
+          force)
       case _ => fail("expected coarse grained scheduler")
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index afebcdd7b9e314cacdb622bb7cbe4f1cd38ca660..06d7afaaff55c9f06d61e452f4a77dd997262142 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -479,7 +479,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
 
   test("blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
     val allocationClientMock = mock[ExecutorAllocationClient]
-    when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called"))
+    when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called"))
     when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
       // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
       // is updated before we ask the executor allocation client to kill all the executors
@@ -517,7 +517,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist1.execToFailures)
 
-    verify(allocationClientMock, never).killExecutors(any(), any(), any())
+    verify(allocationClientMock, never).killExecutors(any(), any(), any(), any())
     verify(allocationClientMock, never).killExecutorsOnHost(any())
 
     // Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
@@ -533,7 +533,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist2.execToFailures)
 
-    verify(allocationClientMock).killExecutors(Seq("1"), true, true)
+    verify(allocationClientMock).killExecutors(Seq("1"), false, false, true)
 
     val taskSetBlacklist3 = createTaskSetBlacklist(stageId = 1)
     // Fail 4 tasks in one task set on executor 2, so that executor gets blacklisted for the whole
@@ -545,13 +545,13 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     }
     blacklist.updateBlacklistForSuccessfulTaskSet(0, 0, taskSetBlacklist3.execToFailures)
 
-    verify(allocationClientMock).killExecutors(Seq("2"), true, true)
+    verify(allocationClientMock).killExecutors(Seq("2"), false, false, true)
     verify(allocationClientMock).killExecutorsOnHost("hostA")
   }
 
   test("fetch failure blacklisting kills executors, configured by BLACKLIST_KILL_ENABLED") {
     val allocationClientMock = mock[ExecutorAllocationClient]
-    when(allocationClientMock.killExecutors(any(), any(), any())).thenReturn(Seq("called"))
+    when(allocationClientMock.killExecutors(any(), any(), any(), any())).thenReturn(Seq("called"))
     when(allocationClientMock.killExecutorsOnHost("hostA")).thenAnswer(new Answer[Boolean] {
       // To avoid a race between blacklisting and killing, it is important that the nodeBlacklist
       // is updated before we ask the executor allocation client to kill all the executors
@@ -571,7 +571,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     conf.set(config.BLACKLIST_KILL_ENABLED, false)
     blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")
 
-    verify(allocationClientMock, never).killExecutors(any(), any(), any())
+    verify(allocationClientMock, never).killExecutors(any(), any(), any(), any())
     verify(allocationClientMock, never).killExecutorsOnHost(any())
 
     // Enable auto-kill. Blacklist an executor and make sure killExecutors is called.
@@ -580,7 +580,7 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M
     clock.advance(1000)
     blacklist.updateBlacklistForFetchFailure("hostA", exec = "1")
 
-    verify(allocationClientMock).killExecutors(Seq("1"), true, true)
+    verify(allocationClientMock).killExecutors(Seq("1"), false, false, true)
     verify(allocationClientMock, never).killExecutorsOnHost(any())
 
     assert(blacklist.executorIdToBlacklistStatus.contains("1"))