diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 70470cc6d203735daf3473eead9ad8db78c7a74e..f71bfd489dbf26fb3f1bbeab61e855c92f7a0e10 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
 
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
+import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
@@ -43,24 +44,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   extends ExecutorAllocationClient with SchedulerBackend with Logging
 {
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
-  var totalCoreCount = new AtomicInteger(0)
+  protected val totalCoreCount = new AtomicInteger(0)
   // Total number of executors that are currently registered
-  var totalRegisteredExecutors = new AtomicInteger(0)
-  val conf = scheduler.sc.conf
+  protected val totalRegisteredExecutors = new AtomicInteger(0)
+  protected val conf = scheduler.sc.conf
   private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
   // Submit tasks only after (registered resources / total expected resources)
   // is equal to at least this value, that is double between 0 and 1.
-  var minRegisteredRatio =
+  private val _minRegisteredRatio =
     math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
   // Submit tasks after maxRegisteredWaitingTime milliseconds
   // if minRegisteredRatio has not yet been reached
-  val maxRegisteredWaitingTimeMs =
+  private val maxRegisteredWaitingTimeMs =
     conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
-  val createTime = System.currentTimeMillis()
+  private val createTime = System.currentTimeMillis()
 
+  // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
+  // protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
+  // must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should
+  // only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by
+  // `CoarseGrainedSchedulerBackend.this`.
   private val executorDataMap = new HashMap[String, ExecutorData]
 
   // Number of executors requested from the cluster manager that have not registered yet
+  @GuardedBy("CoarseGrainedSchedulerBackend.this")
   private var numPendingExecutors = 0
 
   private val listenerBus = scheduler.sc.listenerBus
@@ -68,23 +75,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   // Executors we have requested the cluster manager to kill that have not died yet; maps
   // the executor ID to whether it was explicitly killed by the driver (and thus shouldn't
   // be considered an app-related failure).
+  @GuardedBy("CoarseGrainedSchedulerBackend.this")
   private val executorsPendingToRemove = new HashMap[String, Boolean]
 
   // A map to store hostname with its possible task number running on it
+  @GuardedBy("CoarseGrainedSchedulerBackend.this")
   protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
 
   // The number of pending tasks which is locality required
+  @GuardedBy("CoarseGrainedSchedulerBackend.this")
   protected var localityAwareTasks = 0
 
-  // Executors that have been lost, but for which we don't yet know the real exit reason.
-  protected val executorsPendingLossReason = new HashSet[String]
-
   // The num of current max ExecutorId used to re-register appMaster
-  protected var currentExecutorIdCounter = 0
+  @volatile protected var currentExecutorIdCounter = 0
 
   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
     extends ThreadSafeRpcEndpoint with Logging {
 
+    // Executors that have been lost, but for which we don't yet know the real exit reason.
+    protected val executorsPendingLossReason = new HashSet[String]
+
     // If this DriverEndpoint is changed to support multiple threads,
     // then this may need to be changed so that we don't share the serializer
     // instance across threads
@@ -261,7 +271,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     // Remove a disconnected slave from the cluster
-    def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
+    private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
       executorDataMap.get(executorId) match {
         case Some(executorInfo) =>
           // This must be synchronized because variables mutated
@@ -313,7 +323,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   var driverEndpoint: RpcEndpointRef = null
-  val taskIdsOnSlave = new HashMap[String, HashSet[String]]
+
+  protected def minRegisteredRatio: Double = _minRegisteredRatio
 
   override def start() {
     val properties = new ArrayBuffer[(String, String)]
@@ -417,7 +428,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
   /**
    * Return the number of executors currently registered with this backend.
    */
-  def numExistingExecutors: Int = executorDataMap.size
+  private def numExistingExecutors: Int = executorDataMap.size
 
   /**
    * Request an additional number of executors from the cluster manager.
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 5aeaf44732f7b7819e69c8a4b8b1345d339578a9..8720ee57fec777a26373c12c5d004b43d5e76f65 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -39,9 +39,12 @@ private[spark] abstract class YarnSchedulerBackend(
     sc: SparkContext)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
 
-  if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
-    minRegisteredRatio = 0.8
-  }
+  override val minRegisteredRatio =
+    if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
+      0.8
+    } else {
+      super.minRegisteredRatio
+    }
 
   protected var totalExpectedExecutors = 0