Skip to content
Snippets Groups Projects
Commit b85f9a24 authored by xutingjun's avatar xutingjun Committed by Andrew Or
Browse files

[SPARK-8366] maxNumExecutorsNeeded should properly handle failed tasks

Author: xutingjun <xutingjun@huawei.com>
Author: meiyoula <1039320815@qq.com>

Closes #6817 from XuTingjun/SPARK-8366.
parent b1581ac2
No related branches found
No related tags found
No related merge requests found
......@@ -599,14 +599,8 @@ private[spark] class ExecutorAllocationManager(
// If this is the last pending task, mark the scheduler queue as empty
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
val numTasksScheduled = stageIdToTaskIndices(stageId).size
val numTasksTotal = stageIdToNumTasks.getOrElse(stageId, -1)
if (numTasksScheduled == numTasksTotal) {
// No more pending tasks for this stage
stageIdToNumTasks -= stageId
if (stageIdToNumTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerQueueEmpty()
}
// Mark the executor on which this task is scheduled as busy
......@@ -618,6 +612,8 @@ private[spark] class ExecutorAllocationManager(
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
val taskIndex = taskEnd.taskInfo.index
val stageId = taskEnd.stageId
allocationManager.synchronized {
numRunningTasks -= 1
// If the executor is no longer running any scheduled tasks, mark it as idle
......@@ -628,6 +624,16 @@ private[spark] class ExecutorAllocationManager(
allocationManager.onExecutorIdle(executorId)
}
}
// If the task failed, we expect it to be resubmitted later. To ensure we have
// enough resources to run the resubmitted task, we need to mark the scheduler
// as backlogged again if it's not already marked as such (SPARK-8366)
if (taskEnd.reason != Success) {
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerBacklogged()
}
stageIdToTaskIndices.get(stageId).foreach { _.remove(taskIndex) }
}
}
}
......
......@@ -206,8 +206,8 @@ class ExecutorAllocationManagerSuite
val task2Info = createTaskInfo(1, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task1Info, null))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, null, task2Info, null))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task1Info, null))
sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task2Info, null))
assert(adjustRequestedExecutors(manager) === -1)
}
......@@ -787,6 +787,24 @@ class ExecutorAllocationManagerSuite
Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2))
}
test("SPARK-8366: maxNumExecutorsNeeded should properly handle failed tasks") {
sc = createSparkContext()
val manager = sc.executorAllocationManager.get
assert(maxNumExecutorsNeeded(manager) === 0)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
assert(maxNumExecutorsNeeded(manager) === 1)
val taskInfo = createTaskInfo(1, 1, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
assert(maxNumExecutorsNeeded(manager) === 1)
// If the task is failed, we expect it to be resubmitted later.
val taskEndReason = ExceptionFailure(null, null, null, null, null)
sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null))
assert(maxNumExecutorsNeeded(manager) === 1)
}
private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment