diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0dae0e614e17d5b47b9fa5aaca641d22c0be5b82..10d55c87fb8de00b9134e43ff8a137f9fb65c4a0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -386,15 +386,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
    * be called in the yarn-client mode when AM re-registers after a failure.
    * */
-  protected def reset(): Unit = synchronized {
-    numPendingExecutors = 0
-    executorsPendingToRemove.clear()
+  protected def reset(): Unit = {
+    val executors = synchronized {
+      numPendingExecutors = 0
+      executorsPendingToRemove.clear()
+      Set() ++ executorDataMap.keys
+    }
 
     // Remove all the lingering executors that should be removed but not yet. The reason might be
     // because (1) disconnected event is not yet received; (2) executors die silently.
-    executorDataMap.toMap.foreach { case (eid, _) =>
-      driverEndpoint.askWithRetry[Boolean](
-        RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
+    executors.foreach { eid =>
+      removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))
     }
   }