Skip to content
Snippets Groups Projects
Commit 35b6358a authored by Ankur Dave's avatar Ankur Dave
Browse files

Report errors in tasks to the driver via a Mesos status update

When a task throws an exception, the Spark executor previously just
logged it to a local file on the slave and exited. This commit causes
Spark to also report the exception back to the driver using a Mesos
status update, so the user doesn't have to look through a log file on
the slave.

Here's what the reporting currently looks like:

    # ./run spark.examples.ExceptionHandlingTest master@203.0.113.1:5050
    [...]
    11/10/26 21:04:13 INFO spark.SimpleJob: Lost TID 1 (task 0:1)
    11/10/26 21:04:13 INFO spark.SimpleJob: Loss was due to java.lang.Exception: Testing exception handling
    [...]
    11/10/26 21:04:16 INFO spark.SparkContext: Job finished in 5.988547328 s
parent 07532021
No related branches found
No related tags found
No related merge requests found
...@@ -87,6 +87,13 @@ class Executor extends org.apache.mesos.Executor with Logging { ...@@ -87,6 +87,13 @@ class Executor extends org.apache.mesos.Executor with Logging {
.build()) .build())
} }
case t: Throwable => { case t: Throwable => {
val reason = OtherFailure(t.toString())
d.sendStatusUpdate(TaskStatus.newBuilder()
.setTaskId(desc.getTaskId)
.setState(TaskState.TASK_FAILED)
.setData(ByteString.copyFrom(Utils.serialize(reason)))
.build())
// TODO: Handle errors in tasks less dramatically // TODO: Handle errors in tasks less dramatically
logError("Exception in task ID " + tid, t) logError("Exception in task ID " + tid, t)
System.exit(1) System.exit(1)
......
...@@ -229,6 +229,8 @@ extends Job(jobId) with Logging ...@@ -229,6 +229,8 @@ extends Job(jobId) with Logging
if (tasksFinished == numTasks) if (tasksFinished == numTasks)
sched.jobFinished(this) sched.jobFinished(this)
return return
case otherFailure: OtherFailure =>
logInfo("Loss was due to %s".format(otherFailure.message))
case _ => {} case _ => {}
} }
} }
......
package spark.examples
import spark.SparkContext
object ExceptionHandlingTest {
def main(args: Array[String]) {
if (args.length == 0) {
System.err.println("Usage: ExceptionHandlingTest <host>")
System.exit(1)
}
val sc = new SparkContext(args(0), "ExceptionHandlingTest")
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
if (Math.random > 0.75)
throw new Exception("Testing exception handling")
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment