Skip to content
Snippets Groups Projects
Commit b60b9fc1 authored by Jimmy Xiang's avatar Jimmy Xiang Committed by Marcelo Vanzin
Browse files

[SPARK-19757][CORE] DriverEndpoint#makeOffers race against...

[SPARK-19757][CORE] DriverEndpoint#makeOffers race against CoarseGrainedSchedulerBackend#killExecutors

## What changes were proposed in this pull request?
While some executors are being killed due to idleness, if some new tasks come in, driver could assign them to some executors are being killed. These tasks will fail later when the executors are lost. This patch is to make sure CoarseGrainedSchedulerBackend#killExecutors and DriverEndpoint#makeOffers are properly synchronized.

## How was this patch tested?
manual tests

Author: Jimmy Xiang <jxiang@apache.org>

Closes #17091 from jxiang/spark-19757.
parent 206030bd
No related branches found
No related tags found
No related merge requests found
...@@ -222,12 +222,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -222,12 +222,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors // Make fake resource offers on all executors
private def makeOffers() { private def makeOffers() {
// Filter out executors under killing // Make sure no executor is killed while some task is launching on it
val activeExecutors = executorDataMap.filterKeys(executorIsAlive) val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val workOffers = activeExecutors.map { case (id, executorData) => // Filter out executors under killing
new WorkerOffer(id, executorData.executorHost, executorData.freeCores) val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
}.toIndexedSeq val workOffers = activeExecutors.map { case (id, executorData) =>
launchTasks(scheduler.resourceOffers(workOffers)) new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
}
} }
override def onDisconnected(remoteAddress: RpcAddress): Unit = { override def onDisconnected(remoteAddress: RpcAddress): Unit = {
...@@ -240,12 +246,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp ...@@ -240,12 +246,20 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor // Make fake resource offers on just one executor
private def makeOffers(executorId: String) { private def makeOffers(executorId: String) {
// Filter out executors under killing // Make sure no executor is killed while some task is launching on it
if (executorIsAlive(executorId)) { val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val executorData = executorDataMap(executorId) // Filter out executors under killing
val workOffers = IndexedSeq( if (executorIsAlive(executorId)) {
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(workOffers)) val workOffers = IndexedSeq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
scheduler.resourceOffers(workOffers)
} else {
Seq.empty
}
}
if (!taskDescs.isEmpty) {
launchTasks(taskDescs)
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment