diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 261b3329a7b9c7ca18838a8ce92b449f82502dae..fcc72ff49276da7378bd2be2eb9daeaf0e9b2077 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -331,7 +331,7 @@ private[spark] class ExecutorAllocationManager(
       val delta = addExecutors(maxNeeded)
       logDebug(s"Starting timer to add more executors (to " +
         s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
-      addTime += sustainedSchedulerBacklogTimeoutS * 1000
+      addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000)
       delta
     } else {
       0
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4eedaaea61195a2b941e60be2b8832a6a5d680d6..dc82bb7704727ca0229c5861a66c8407629a25a3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -69,6 +69,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // `CoarseGrainedSchedulerBackend.this`.
   private val executorDataMap = new HashMap[String, ExecutorData]
 
+  // Number of executors requested by the cluster manager, [[ExecutorAllocationManager]]
+  @GuardedBy("CoarseGrainedSchedulerBackend.this")
+  private var requestedTotalExecutors = 0
+
   // Number of executors requested from the cluster manager that have not registered yet
   @GuardedBy("CoarseGrainedSchedulerBackend.this")
   private var numPendingExecutors = 0
@@ -413,6 +417,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * */
   protected def reset(): Unit = {
     val executors = synchronized {
+      requestedTotalExecutors = 0
       numPendingExecutors = 0
       executorsPendingToRemove.clear()
       Set() ++ executorDataMap.keys
@@ -487,12 +492,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
 
     val response = synchronized {
+      requestedTotalExecutors += numAdditionalExecutors
       numPendingExecutors += numAdditionalExecutors
       logDebug(s"Number of pending executors is now $numPendingExecutors")
+      if (requestedTotalExecutors !=
+          (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
+        logDebug(
+          s"""requestExecutors($numAdditionalExecutors): Executor request doesn't match:
+             |requestedTotalExecutors  = $requestedTotalExecutors
+             |numExistingExecutors     = $numExistingExecutors
+             |numPendingExecutors      = $numPendingExecutors
+             |executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
+      }
 
       // Account for executors pending to be added or removed
-      doRequestTotalExecutors(
-        numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+      doRequestTotalExecutors(requestedTotalExecutors)
     }
 
     defaultAskTimeout.awaitResult(response)
@@ -524,6 +538,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     val response = synchronized {
+      this.requestedTotalExecutors = numExecutors
       this.localityAwareTasks = localityAwareTasks
       this.hostToLocalTaskCount = hostToLocalTaskCount
 
@@ -589,8 +604,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       // take into account executors that are pending to be added or removed.
       val adjustTotalExecutors =
         if (!replace) {
-          doRequestTotalExecutors(
-            numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
+          requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
+          if (requestedTotalExecutors !=
+              (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
+            logDebug(
+              s"""killExecutors($executorIds, $replace, $force): Executor counts do not match:
+                 |requestedTotalExecutors  = $requestedTotalExecutors
+                 |numExistingExecutors     = $numExistingExecutors
+                 |numPendingExecutors      = $numPendingExecutors
+                 |executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
+          }
+          doRequestTotalExecutors(requestedTotalExecutors)
         } else {
           numPendingExecutors += knownExecutors.size
           Future.successful(true)
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 9839dcf8535db871836ee55f159b81e8f6d87cf4..bf7480d79f8a11b5621dc6d826d81121aaf6abfc 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -356,12 +356,13 @@ class StandaloneDynamicAllocationSuite
   test("kill the same executor twice (SPARK-9795)") {
     sc = new SparkContext(appConf)
     val appId = sc.applicationId
+    sc.requestExecutors(2)
     eventually(timeout(10.seconds), interval(10.millis)) {
       val apps = getApplications()
       assert(apps.size === 1)
       assert(apps.head.id === appId)
       assert(apps.head.executors.size === 2)
-      assert(apps.head.getExecutorLimit === Int.MaxValue)
+      assert(apps.head.getExecutorLimit === 2)
     }
     // sync executors between the Master and the driver, needed because
     // the driver refuses to kill executors it does not know about
@@ -380,12 +381,13 @@ class StandaloneDynamicAllocationSuite
   test("the pending replacement executors should not be lost (SPARK-10515)") {
     sc = new SparkContext(appConf)
     val appId = sc.applicationId
+    sc.requestExecutors(2)
     eventually(timeout(10.seconds), interval(10.millis)) {
       val apps = getApplications()
       assert(apps.size === 1)
       assert(apps.head.id === appId)
       assert(apps.head.executors.size === 2)
-      assert(apps.head.getExecutorLimit === Int.MaxValue)
+      assert(apps.head.getExecutorLimit === 2)
     }
     // sync executors between the Master and the driver, needed because
     // the driver refuses to kill executors it does not know about