diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 263e6197a6f45df7ca9f2e00867cea4ae2e3e941..5177557132dbf4868d3dd56492338792a80225f7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -553,7 +553,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       taskId: String,
       reason: String): Unit = {
     stateLock.synchronized {
-      removeExecutor(taskId, SlaveLost(reason))
+      // Do not call removeExecutor() after this scheduler backend was stopped because
+      // removeExecutor() internally will send a message to the driver endpoint but
+      // the driver endpoint is not available now, otherwise an exception will be thrown.
+      if (!stopCalled) {
+        removeExecutor(taskId, SlaveLost(reason))
+      }
       slaves(slaveId).taskIDs.remove(taskId)
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index a74fdf79a13cba2824006731768232a2018360b0..0e66979901540b3637cd12df4fabe3ced9745720 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.util.Collections
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
 
 import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
 import org.apache.mesos.Protos._
@@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster.mesos.Utils._
 
@@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
   private var backend: MesosCoarseGrainedSchedulerBackend = _
   private var externalShuffleClient: MesosExternalShuffleClient = _
   private var driverEndpoint: RpcEndpointRef = _
+  @volatile private var stopCalled = false
 
   test("mesos supports killing and limiting executors") {
     setBackend()
@@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
     assert(!dockerInfo.getForcePullImage)
   }
 
+  test("Do not call removeExecutor() after backend is stopped") {
+    setBackend()
+
+    // launches a task on a valid offer
+    val offers = List((backend.executorMemory(sc), 1))
+    offerResources(offers)
+    verifyTaskLaunched(driver, "o1")
+
+    // launches a thread simulating status update
+    val statusUpdateThread = new Thread {
+      override def run(): Unit = {
+        while (!stopCalled) {
+          Thread.sleep(100)
+        }
+
+        val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
+        backend.statusUpdate(driver, status)
+      }
+    }.start
+
+    backend.stop()
+    // Any method of the backend involving sending messages to the driver endpoint should not
+    // be called after the backend is stopped.
+    verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
+  }
+
   private def verifyDeclinedOffer(driver: SchedulerDriver,
       offerId: OfferID,
       filter: Boolean = false): Unit = {
@@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
         mesosDriver = newDriver
       }
 
+      override def stopExecutors(): Unit = {
+        stopCalled = true
+      }
+
       markRegistered()
     }
     backend.start()