diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 34c32ce312964f05f71075cb36489d40825ac5d0..6176e258989db6f2209c1bd7638479f3214f57c4 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -89,6 +89,8 @@ private[spark] class ExecutorAllocationManager(
   private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
   private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
     Integer.MAX_VALUE)
+  private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors",
+    minNumExecutors)
 
   // How long there must be backlogged tasks for before an addition is triggered (seconds)
   private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
@@ -121,8 +123,7 @@ private[spark] class ExecutorAllocationManager(
 
   // The desired number of executors at this moment in time. If all our executors were to die, this
   // is the number of executors we would immediately want from the cluster manager.
-  private var numExecutorsTarget =
-    conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
+  private var numExecutorsTarget = initialNumExecutors
 
   // Executors that have been requested to be removed but have not been killed yet
   private val executorsPendingToRemove = new mutable.HashSet[String]
@@ -240,6 +241,19 @@ private[spark] class ExecutorAllocationManager(
     executor.awaitTermination(10, TimeUnit.SECONDS)
   }
 
+  /**
+   * Reset the allocation manager to the initial state. Currently this will only be called in
+   * yarn-client mode when AM re-registers after a failure.
+   */
+  def reset(): Unit = synchronized {
+    initializing = true
+    numExecutorsTarget = initialNumExecutors
+    numExecutorsToAdd = 1
+
+    executorsPendingToRemove.clear()
+    removeTimes.clear()
+  }
+
   /**
    * The maximum number of executors we would need under the current load to satisfy all running
    * and pending tasks, rounded up.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 505c161141c8801873571f7699ae8a9f3e39ddef..7efe16749e59d31d2e7f13c733153f57dc474961 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -341,6 +341,25 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
   }
 
+  /**
+   * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
+   * be called in the yarn-client mode when AM re-registers after a failure, also dynamic
+   * allocation is enabled.
+   * */
+  protected def reset(): Unit = synchronized {
+    if (Utils.isDynamicAllocationEnabled(conf)) {
+      numPendingExecutors = 0
+      executorsPendingToRemove.clear()
+
+      // Remove all the lingering executors that should be removed but not yet. The reason might be
+      // because (1) disconnected event is not yet received; (2) executors die silently.
+      executorDataMap.toMap.foreach { case (eid, _) =>
+        driverEndpoint.askWithRetry[Boolean](
+          RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
+      }
+    }
+  }
+
   override def reviveOffers() {
     driverEndpoint.send(ReviveOffers)
   }
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 116f027a0f987a4f0c7a9d391e3ffc1c51085732..fedfbd547b91b6a3634b346e2c62b29d81b26c75 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -805,6 +805,90 @@ class ExecutorAllocationManagerSuite
     assert(maxNumExecutorsNeeded(manager) === 1)
   }
 
+  test("reset the state of allocation manager") {
+    sc = createSparkContext()
+    val manager = sc.executorAllocationManager.get
+    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 1)
+
+    // Allocation manager is reset when adding executor requests are sent without reporting back
+    // executor added.
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))
+
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsTarget(manager) === 2)
+    assert(addExecutors(manager) === 2)
+    assert(numExecutorsTarget(manager) === 4)
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsTarget(manager) === 5)
+
+    manager.reset()
+    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(executorIds(manager) === Set.empty)
+
+    // Allocation manager is reset when executors are added.
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))
+
+    addExecutors(manager)
+    addExecutors(manager)
+    addExecutors(manager)
+    assert(numExecutorsTarget(manager) === 5)
+
+    onExecutorAdded(manager, "first")
+    onExecutorAdded(manager, "second")
+    onExecutorAdded(manager, "third")
+    onExecutorAdded(manager, "fourth")
+    onExecutorAdded(manager, "fifth")
+    assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
+
+    // Cluster manager lost will make all the live executors lost, so here simulate this behavior
+    onExecutorRemoved(manager, "first")
+    onExecutorRemoved(manager, "second")
+    onExecutorRemoved(manager, "third")
+    onExecutorRemoved(manager, "fourth")
+    onExecutorRemoved(manager, "fifth")
+
+    manager.reset()
+    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(executorIds(manager) === Set.empty)
+    assert(removeTimes(manager) === Map.empty)
+
+    // Allocation manager is reset when executors are pending to remove
+    addExecutors(manager)
+    addExecutors(manager)
+    addExecutors(manager)
+    assert(numExecutorsTarget(manager) === 5)
+
+    onExecutorAdded(manager, "first")
+    onExecutorAdded(manager, "second")
+    onExecutorAdded(manager, "third")
+    onExecutorAdded(manager, "fourth")
+    onExecutorAdded(manager, "fifth")
+    assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
+
+    removeExecutor(manager, "first")
+    removeExecutor(manager, "second")
+    assert(executorsPendingToRemove(manager) === Set("first", "second"))
+    assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
+
+
+    // Cluster manager lost will make all the live executors lost, so here simulate this behavior
+    onExecutorRemoved(manager, "first")
+    onExecutorRemoved(manager, "second")
+    onExecutorRemoved(manager, "third")
+    onExecutorRemoved(manager, "fourth")
+    onExecutorRemoved(manager, "fifth")
+
+    manager.reset()
+
+    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(executorsPendingToRemove(manager) === Set.empty)
+    assert(removeTimes(manager) === Map.empty)
+  }
+
   private def createSparkContext(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e3dd87798f0184a4e82e69a557711b05f5ee5c09..1431bceb256a7107d65ee856ca3c6a848b4124b6 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -60,6 +60,9 @@ private[spark] abstract class YarnSchedulerBackend(
   /** Scheduler extension services. */
   private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
 
+  // Flag to specify whether this schedulerBackend should be reset.
+  private var shouldResetOnAmRegister = false
+
   /**
    * Bind to YARN. This *must* be done before calling [[start()]].
    *
@@ -155,6 +158,16 @@ private[spark] abstract class YarnSchedulerBackend(
     new YarnDriverEndpoint(rpcEnv, properties)
   }
 
+  /**
+   * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed
+   * and re-registered itself to driver after a failure. The stale state in driver should be
+   * cleaned.
+   */
+  override protected def reset(): Unit = {
+    super.reset()
+    sc.executorAllocationManager.foreach(_.reset())
+  }
+
   /**
    * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
    * This endpoint communicates with the executors and queries the AM for an executor's exit
@@ -218,6 +231,8 @@ private[spark] abstract class YarnSchedulerBackend(
         case None =>
           logWarning("Attempted to check for an executor loss reason" +
             " before the AM has registered!")
+          driverEndpoint.askWithRetry[Boolean](
+            RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
       }
     }
 
@@ -225,6 +240,13 @@ private[spark] abstract class YarnSchedulerBackend(
       case RegisterClusterManager(am) =>
         logInfo(s"ApplicationMaster registered as $am")
         amEndpoint = Option(am)
+        if (!shouldResetOnAmRegister) {
+          shouldResetOnAmRegister = true
+        } else {
+          // AM is already registered before, this potentially means that AM failed and
+          // a new one registered after the failure. This will only happen in yarn-client mode.
+          reset()
+        }
 
       case AddWebUIFilter(filterName, filterParams, proxyBase) =>
         addWebUIFilter(filterName, filterParams, proxyBase)
@@ -270,6 +292,7 @@ private[spark] abstract class YarnSchedulerBackend(
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
       if (amEndpoint.exists(_.address == remoteAddress)) {
         logWarning(s"ApplicationMaster has disassociated: $remoteAddress")
+        amEndpoint = None
       }
     }