Skip to content
Snippets Groups Projects
Commit be5d1912 authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-9795] Dynamic allocation: avoid double counting when killing same executor twice

This is based on KaiXinXiaoLei's changes in #7716.

The issue is that when someone calls `sc.killExecutor("1")` on the same executor twice quickly, then the executor target will be adjusted downwards by 2 instead of 1 even though we're only actually killing one executor. In certain cases where we don't adjust the target back upwards quickly, we'll end up with jobs hanging.

This is a common danger because there are many places where this is called:
- `HeartbeatReceiver` kills an executor that has not been sending heartbeats
- `ExecutorAllocationManager` kills an executor that has been idle
- The user code might call this, which may interfere with the previous callers

While it's not clear whether this fixes SPARK-9745, fixing this potential race condition seems like a strict improvement. I've added a regression test to illustrate the issue.

Author: Andrew Or <andrew@databricks.com>

Closes #8078 from andrewor14/da-double-kill.
parent 2e680668
No related branches found
No related tags found
No related merge requests found
...@@ -422,16 +422,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -422,16 +422,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
logWarning(s"Executor to kill $id does not exist!") logWarning(s"Executor to kill $id does not exist!")
} }
// If an executor is already pending to be removed, do not kill it again (SPARK-9795)
val executorsToKill = knownExecutors.filter { id => !executorsPendingToRemove.contains(id) }
executorsPendingToRemove ++= executorsToKill
// If we do not wish to replace the executors we kill, sync the target number of executors // If we do not wish to replace the executors we kill, sync the target number of executors
// with the cluster manager to avoid allocating new ones. When computing the new target, // with the cluster manager to avoid allocating new ones. When computing the new target,
// take into account executors that are pending to be added or removed. // take into account executors that are pending to be added or removed.
if (!replace) { if (!replace) {
doRequestTotalExecutors(numExistingExecutors + numPendingExecutors doRequestTotalExecutors(
- executorsPendingToRemove.size - knownExecutors.size) numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
} }
executorsPendingToRemove ++= knownExecutors doKillExecutors(executorsToKill)
doKillExecutors(knownExecutors)
} }
/** /**
......
...@@ -283,6 +283,26 @@ class StandaloneDynamicAllocationSuite ...@@ -283,6 +283,26 @@ class StandaloneDynamicAllocationSuite
assert(master.apps.head.getExecutorLimit === 1000) assert(master.apps.head.getExecutorLimit === 1000)
} }
test("kill the same executor twice (SPARK-9795)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
assert(master.apps.size === 1)
assert(master.apps.head.id === appId)
assert(master.apps.head.executors.size === 2)
assert(master.apps.head.getExecutorLimit === Int.MaxValue)
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
syncExecutors(sc)
// kill the same executor twice
val executors = getExecutorIds(sc)
assert(executors.size === 2)
assert(sc.killExecutor(executors.head))
assert(sc.killExecutor(executors.head))
assert(master.apps.head.executors.size === 1)
// The limit should not be lowered twice
assert(master.apps.head.getExecutorLimit === 1)
}
// =============================== // ===============================
// | Utility methods for testing | // | Utility methods for testing |
// =============================== // ===============================
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment