Skip to content
Snippets Groups Projects
Commit c06a307c authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #445 from kayousterhout/exec_lost

Fail rather than hanging if a task crashes the JVM.

Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED.  As a result, the TaskSetManager will retry the task
indefinitely rather than failing the job after maxFailures. Eventually,
this makes the job hang, because the Standalone Scheduler removes
the application after 10 works have failed, and then the app is left
in a state where it's disconnected from the master and waiting to reconnect.
This commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.

The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.
parents 84595ea3 718a13c1
No related branches found
No related tags found
No related merge requests found
......@@ -629,7 +629,7 @@ private[spark] class TaskSetManager(
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
handleFailedTask(tid, TaskState.KILLED, None)
handleFailedTask(tid, TaskState.FAILED, None)
}
}
......
......@@ -125,6 +125,23 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(thrown.getMessage.contains("failed 4 times"))
}
test("repeatedly failing task that crashes JVM") {
// Ensures that if a task fails in a way that crashes the JVM, the job eventually fails rather
// than hanging due to retrying the failed task infinitely many times (eventually the
// standalone scheduler will remove the application, causing the job to hang waiting to
// reconnect to the master).
sc = new SparkContext(clusterUrl, "test")
failAfter(Span(100000, Millis)) {
val thrown = intercept[SparkException] {
// One of the tasks always fails.
sc.parallelize(1 to 10, 2).foreach { x => if (x == 1) System.exit(42) }
}
assert(thrown.getClass === classOf[SparkException])
System.out.println(thrown.getMessage)
assert(thrown.getMessage.contains("failed 4 times"))
}
}
test("caching") {
sc = new SparkContext(clusterUrl, "test")
val data = sc.parallelize(1 to 1000, 10).cache()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment