Skip to content
Snippets Groups Projects
Commit 48467f4e authored by Shixiong Zhu's avatar Shixiong Zhu Committed by Andrew Or
Browse files

[SPARK-14416][CORE] Add thread-safe comments for CoarseGrainedSchedulerBackend's fields

## What changes were proposed in this pull request?

While I was reviewing #12078, I found most of CoarseGrainedSchedulerBackend's mutable fields doesn't have any comments about the thread-safe assumptions and it's hard for people to figure out which part of codes should be protected by the lock. This PR just added comments/annotations for them and also added strict access modifiers for some fields.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12188 from zsxwing/comments.
parent adbfdb87
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
......@@ -43,24 +44,30 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
extends ExecutorAllocationClient with SchedulerBackend with Logging
{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
protected val totalCoreCount = new AtomicInteger(0)
// Total number of executors that are currently registered
var totalRegisteredExecutors = new AtomicInteger(0)
val conf = scheduler.sc.conf
protected val totalRegisteredExecutors = new AtomicInteger(0)
protected val conf = scheduler.sc.conf
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
// Submit tasks only after (registered resources / total expected resources)
// is equal to at least this value, that is double between 0 and 1.
var minRegisteredRatio =
private val _minRegisteredRatio =
math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
// Submit tasks after maxRegisteredWaitingTime milliseconds
// if minRegisteredRatio has not yet been reached
val maxRegisteredWaitingTimeMs =
private val maxRegisteredWaitingTimeMs =
conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s")
val createTime = System.currentTimeMillis()
private val createTime = System.currentTimeMillis()
// Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any
// protection. But accessing `executorDataMap` out of `DriverEndpoint.receive/receiveAndReply`
// must be protected by `CoarseGrainedSchedulerBackend.this`. Besides, `executorDataMap` should
// only be modified in `DriverEndpoint.receive/receiveAndReply` with protection by
// `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData]
// Number of executors requested from the cluster manager that have not registered yet
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private var numPendingExecutors = 0
private val listenerBus = scheduler.sc.listenerBus
......@@ -68,23 +75,26 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Executors we have requested the cluster manager to kill that have not died yet; maps
// the executor ID to whether it was explicitly killed by the driver (and thus shouldn't
// be considered an app-related failure).
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private val executorsPendingToRemove = new HashMap[String, Boolean]
// A map to store hostname with its possible task number running on it
@GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
// The number of pending tasks which is locality required
@GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var localityAwareTasks = 0
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]
// The num of current max ExecutorId used to re-register appMaster
protected var currentExecutorIdCounter = 0
@volatile protected var currentExecutorIdCounter = 0
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]
// If this DriverEndpoint is changed to support multiple threads,
// then this may need to be changed so that we don't share the serializer
// instance across threads
......@@ -261,7 +271,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
// Remove a disconnected slave from the cluster
def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
......@@ -313,7 +323,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
var driverEndpoint: RpcEndpointRef = null
val taskIdsOnSlave = new HashMap[String, HashSet[String]]
protected def minRegisteredRatio: Double = _minRegisteredRatio
override def start() {
val properties = new ArrayBuffer[(String, String)]
......@@ -417,7 +428,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
/**
* Return the number of executors currently registered with this backend.
*/
def numExistingExecutors: Int = executorDataMap.size
private def numExistingExecutors: Int = executorDataMap.size
/**
* Request an additional number of executors from the cluster manager.
......
......@@ -39,9 +39,12 @@ private[spark] abstract class YarnSchedulerBackend(
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
minRegisteredRatio = 0.8
}
override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
} else {
super.minRegisteredRatio
}
protected var totalExpectedExecutors = 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment