Skip to content
Snippets Groups Projects
Commit 5915588a authored by Ryan Blue's avatar Ryan Blue Committed by Marcelo Vanzin
Browse files

[SPARK-20540][CORE] Fix unstable executor requests.


There are two problems fixed in this commit. First, the
ExecutorAllocationManager sets a timeout to avoid requesting executors
too often. However, the timeout is always updated based on its value and
a timeout, not the current time. If the call is delayed by locking for
more than the ongoing scheduler timeout, the manager will request more
executors on every run. This seems to be the main cause of SPARK-20540.

The second problem is that the total number of requested executors is
not tracked by the CoarseGrainedSchedulerBackend. Instead, it calculates
the value based on the current status of 3 variables: the number of
known executors, the number of executors that have been killed, and the
number of pending executors. But, the number of pending executors is
never less than 0, even though there may be more known than requested.
When executors are killed and not replaced, this can cause the request
sent to YARN to be incorrect because there were too many executors due
to the scheduler's state being slightly out of date. This is fixed by tracking
the currently requested size explicitly.

## How was this patch tested?

Existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #17813 from rdblue/SPARK-20540-fix-dynamic-allocation.

(cherry picked from commit 2b2dd08e)
Signed-off-by: default avatarMarcelo Vanzin <vanzin@cloudera.com>
parent 868b4a1a
No related branches found
Tags v2.1.1
No related merge requests found
...@@ -331,7 +331,7 @@ private[spark] class ExecutorAllocationManager( ...@@ -331,7 +331,7 @@ private[spark] class ExecutorAllocationManager(
val delta = addExecutors(maxNeeded) val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " + logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000 addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000)
delta delta
} else { } else {
0 0
......
...@@ -69,6 +69,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -69,6 +69,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// `CoarseGrainedSchedulerBackend.this`. // `CoarseGrainedSchedulerBackend.this`.
private val executorDataMap = new HashMap[String, ExecutorData] private val executorDataMap = new HashMap[String, ExecutorData]
// Number of executors requested by the cluster manager, [[ExecutorAllocationManager]]
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private var requestedTotalExecutors = 0
// Number of executors requested from the cluster manager that have not registered yet // Number of executors requested from the cluster manager that have not registered yet
@GuardedBy("CoarseGrainedSchedulerBackend.this") @GuardedBy("CoarseGrainedSchedulerBackend.this")
private var numPendingExecutors = 0 private var numPendingExecutors = 0
...@@ -390,6 +394,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -390,6 +394,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* */ * */
protected def reset(): Unit = { protected def reset(): Unit = {
val executors = synchronized { val executors = synchronized {
requestedTotalExecutors = 0
numPendingExecutors = 0 numPendingExecutors = 0
executorsPendingToRemove.clear() executorsPendingToRemove.clear()
Set() ++ executorDataMap.keys Set() ++ executorDataMap.keys
...@@ -463,12 +468,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -463,12 +468,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager") logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
val response = synchronized { val response = synchronized {
requestedTotalExecutors += numAdditionalExecutors
numPendingExecutors += numAdditionalExecutors numPendingExecutors += numAdditionalExecutors
logDebug(s"Number of pending executors is now $numPendingExecutors") logDebug(s"Number of pending executors is now $numPendingExecutors")
if (requestedTotalExecutors !=
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
logDebug(
s"""requestExecutors($numAdditionalExecutors): Executor request doesn't match:
|requestedTotalExecutors = $requestedTotalExecutors
|numExistingExecutors = $numExistingExecutors
|numPendingExecutors = $numPendingExecutors
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
}
// Account for executors pending to be added or removed // Account for executors pending to be added or removed
doRequestTotalExecutors( doRequestTotalExecutors(requestedTotalExecutors)
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
} }
defaultAskTimeout.awaitResult(response) defaultAskTimeout.awaitResult(response)
...@@ -500,6 +514,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -500,6 +514,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
} }
val response = synchronized { val response = synchronized {
this.requestedTotalExecutors = numExecutors
this.localityAwareTasks = localityAwareTasks this.localityAwareTasks = localityAwareTasks
this.hostToLocalTaskCount = hostToLocalTaskCount this.hostToLocalTaskCount = hostToLocalTaskCount
...@@ -575,8 +590,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -575,8 +590,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// take into account executors that are pending to be added or removed. // take into account executors that are pending to be added or removed.
val adjustTotalExecutors = val adjustTotalExecutors =
if (!replace) { if (!replace) {
doRequestTotalExecutors( requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size) if (requestedTotalExecutors !=
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
logDebug(
s"""killExecutors($executorIds, $replace, $force): Executor counts do not match:
|requestedTotalExecutors = $requestedTotalExecutors
|numExistingExecutors = $numExistingExecutors
|numPendingExecutors = $numPendingExecutors
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
}
doRequestTotalExecutors(requestedTotalExecutors)
} else { } else {
numPendingExecutors += knownExecutors.size numPendingExecutors += knownExecutors.size
Future.successful(true) Future.successful(true)
......
...@@ -354,12 +354,13 @@ class StandaloneDynamicAllocationSuite ...@@ -354,12 +354,13 @@ class StandaloneDynamicAllocationSuite
test("kill the same executor twice (SPARK-9795)") { test("kill the same executor twice (SPARK-9795)") {
sc = new SparkContext(appConf) sc = new SparkContext(appConf)
val appId = sc.applicationId val appId = sc.applicationId
sc.requestExecutors(2)
eventually(timeout(10.seconds), interval(10.millis)) { eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications() val apps = getApplications()
assert(apps.size === 1) assert(apps.size === 1)
assert(apps.head.id === appId) assert(apps.head.id === appId)
assert(apps.head.executors.size === 2) assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue) assert(apps.head.getExecutorLimit === 2)
} }
// sync executors between the Master and the driver, needed because // sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about // the driver refuses to kill executors it does not know about
...@@ -378,12 +379,13 @@ class StandaloneDynamicAllocationSuite ...@@ -378,12 +379,13 @@ class StandaloneDynamicAllocationSuite
test("the pending replacement executors should not be lost (SPARK-10515)") { test("the pending replacement executors should not be lost (SPARK-10515)") {
sc = new SparkContext(appConf) sc = new SparkContext(appConf)
val appId = sc.applicationId val appId = sc.applicationId
sc.requestExecutors(2)
eventually(timeout(10.seconds), interval(10.millis)) { eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications() val apps = getApplications()
assert(apps.size === 1) assert(apps.size === 1)
assert(apps.head.id === appId) assert(apps.head.id === appId)
assert(apps.head.executors.size === 2) assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue) assert(apps.head.getExecutorLimit === 2)
} }
// sync executors between the Master and the driver, needed because // sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about // the driver refuses to kill executors it does not know about
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment