From 35b6358a7c4e9558789577e07c1953c9008d3e9c Mon Sep 17 00:00:00 2001
From: Ankur Dave <ankurdave@gmail.com>
Date: Wed, 26 Oct 2011 21:07:17 +0000
Subject: [PATCH] Report errors in tasks to the driver via a Mesos status
 update

When a task throws an exception, the Spark executor previously just
logged it to a local file on the slave and exited. This commit causes
Spark to also report the exception back to the driver using a Mesos
status update, so the user doesn't have to look through a log file on
the slave.

Here's what the reporting currently looks like:

    # ./run spark.examples.ExceptionHandlingTest master@203.0.113.1:5050
    [...]
    11/10/26 21:04:13 INFO spark.SimpleJob: Lost TID 1 (task 0:1)
    11/10/26 21:04:13 INFO spark.SimpleJob: Loss was due to java.lang.Exception: Testing exception handling
    [...]
    11/10/26 21:04:16 INFO spark.SparkContext: Job finished in 5.988547328 s
---
 core/src/main/scala/spark/Executor.scala       |  7 +++++++
 core/src/main/scala/spark/SimpleJob.scala      |  2 ++
 .../spark/examples/ExceptionHandlingTest.scala | 18 ++++++++++++++++++
 3 files changed, 27 insertions(+)
 create mode 100644 examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala

diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index e1256a229e..31ba122baf 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -87,6 +87,13 @@ class Executor extends org.apache.mesos.Executor with Logging {
                              .build())
         }
         case t: Throwable => {
+          val reason = OtherFailure(t.toString())
+          d.sendStatusUpdate(TaskStatus.newBuilder()
+                             .setTaskId(desc.getTaskId)
+                             .setState(TaskState.TASK_FAILED)
+                             .setData(ByteString.copyFrom(Utils.serialize(reason)))
+                             .build())
+
           // TODO: Handle errors in tasks less dramatically
           logError("Exception in task ID " + tid, t)
           System.exit(1)
diff --git a/core/src/main/scala/spark/SimpleJob.scala b/core/src/main/scala/spark/SimpleJob.scala
index d982a75ba0..6a27f159c4 100644
--- a/core/src/main/scala/spark/SimpleJob.scala
+++ b/core/src/main/scala/spark/SimpleJob.scala
@@ -229,6 +229,8 @@ extends Job(jobId) with Logging
             if (tasksFinished == numTasks)
               sched.jobFinished(this)
             return
+          case otherFailure: OtherFailure =>
+            logInfo("Loss was due to %s".format(otherFailure.message))
           case _ => {}
         }
       }
diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
new file mode 100644
index 0000000000..46f658eab2
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
@@ -0,0 +1,18 @@
+package spark.examples
+
+import spark.SparkContext
+
+object ExceptionHandlingTest {
+  def main(args: Array[String]) {
+    if (args.length == 0) {
+      System.err.println("Usage: ExceptionHandlingTest <host>")
+      System.exit(1)
+    }
+
+    val sc = new SparkContext(args(0), "ExceptionHandlingTest")
+    sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
+      if (Math.random > 0.75)
+        throw new Exception("Testing exception handling")
+    }
+  }
+}
-- 
GitLab