Skip to content
Snippets Groups Projects
Commit b4546ba9 authored by Kay Ousterhout's avatar Kay Ousterhout
Browse files

Fix bug where scheduler could hang after task failure.

When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
parent 1a4cfbea
No related branches found
No related tags found
No related merge requests found
...@@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -249,7 +249,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None var failedExecutor: Option[String] = None
var taskFailed = false
synchronized { synchronized {
try { try {
if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) { if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {
...@@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -269,9 +268,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
} }
taskIdToExecutorId.remove(tid) taskIdToExecutorId.remove(tid)
} }
if (state == TaskState.FAILED) {
taskFailed = true
}
activeTaskSets.get(taskSetId).foreach { taskSet => activeTaskSets.get(taskSetId).foreach { taskSet =>
if (state == TaskState.FINISHED) { if (state == TaskState.FINISHED) {
taskSet.removeRunningTask(tid) taskSet.removeRunningTask(tid)
...@@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -293,10 +289,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
dagScheduler.executorLost(failedExecutor.get) dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers() backend.reviveOffers()
} }
if (taskFailed) {
// Also revive offers if a task had failed for some reason other than host lost
backend.reviveOffers()
}
} }
def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) { def handleTaskGettingResult(taskSetManager: ClusterTaskSetManager, tid: Long) {
...@@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) ...@@ -316,8 +308,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
taskState: TaskState, taskState: TaskState,
reason: Option[TaskEndReason]) = synchronized { reason: Option[TaskEndReason]) = synchronized {
taskSetManager.handleFailedTask(tid, taskState, reason) taskSetManager.handleFailedTask(tid, taskState, reason)
if (taskState == TaskState.FINISHED) { if (taskState != TaskState.KILLED) {
// The task finished successfully but the result was lost, so we should revive offers. // Need to revive offers again now that the task set manager state has been updated to
// reflect failed tasks that need to be re-run.
backend.reviveOffers() backend.reviveOffers()
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment