From 358ae1534d01ad9e69364a21441a7ef23c2cb516 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@apache.org>
Date: Mon, 30 Jun 2014 11:50:22 -0700
Subject: [PATCH] [SPARK-2322] Exception in resultHandler should NOT crash
 DAGScheduler and shutdown SparkContext.

This should go into 1.0.1.

Author: Reynold Xin <rxin@apache.org>

Closes #1264 from rxin/SPARK-2322 and squashes the following commits:

c77c07f [Reynold Xin] Added comment to SparkDriverExecutionException and a test case for accumulator.
5d8d920 [Reynold Xin] [SPARK-2322] Exception in resultHandler could crash DAGScheduler and shutdown SparkContext.
---
 .../org/apache/spark/SparkException.scala     |  8 +++
 .../apache/spark/scheduler/DAGScheduler.scala | 20 +++++--
 .../spark/scheduler/DAGSchedulerSuite.scala   | 56 ++++++++++++++++++-
 3 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala
index 4351ed74b6..2ebd7a7151 100644
--- a/core/src/main/scala/org/apache/spark/SparkException.scala
+++ b/core/src/main/scala/org/apache/spark/SparkException.scala
@@ -22,3 +22,11 @@ class SparkException(message: String, cause: Throwable)
 
   def this(message: String) = this(message, null)
 }
+
+/**
+ * Exception thrown when execution of some user code in the driver process fails, e.g.
+ * accumulator update fails or failure in takeOrdered (user supplies an Ordering implementation
+ * that can be misbehaving.
+ */
+private[spark] class SparkDriverExecutionException(cause: Throwable)
+  extends SparkException("Execution error", cause)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c8559a7a82..813a9abfaf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -581,8 +581,9 @@ class DAGScheduler(
       }
     } catch {
       case e: Exception =>
-        jobResult = JobFailed(e)
-        job.listener.jobFailed(e)
+        val exception = new SparkDriverExecutionException(e)
+        jobResult = JobFailed(exception)
+        job.listener.jobFailed(exception)
       case oom: OutOfMemoryError =>
         val exception = new SparkException("Local job aborted due to out of memory error", oom)
         jobResult = JobFailed(exception)
@@ -822,6 +823,7 @@ class DAGScheduler(
       case Success =>
         logInfo("Completed " + task)
         if (event.accumUpdates != null) {
+          // TODO: fail the stage if the accumulator update fails...
           Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
         }
         pendingTasks(stage) -= task
@@ -838,7 +840,16 @@ class DAGScheduler(
                     cleanupStateForJobAndIndependentStages(job, Some(stage))
                     listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
                   }
-                  job.listener.taskSucceeded(rt.outputId, event.result)
+
+                  // taskSucceeded runs some user code that might throw an exception. Make sure
+                  // we are resilient against that.
+                  try {
+                    job.listener.taskSucceeded(rt.outputId, event.result)
+                  } catch {
+                    case e: Exception =>
+                      // TODO: Perhaps we want to mark the stage as failed?
+                      job.listener.jobFailed(new SparkDriverExecutionException(e))
+                  }
                 }
               case None =>
                 logInfo("Ignoring result from " + rt + " because its job has finished")
@@ -1161,8 +1172,7 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
   override val supervisorStrategy =
     OneForOneStrategy() {
       case x: Exception =>
-        logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
-          .format(x.getMessage))
+        logError("eventProcesserActor failed; shutting down SparkContext", x)
         try {
           dagScheduler.doCancelAllJobs()
         } catch {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8dd2a9b9f7..9f498d579a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.scheduler
 
-import scala.Tuple2
 import scala.collection.mutable.{HashSet, HashMap, Map}
 import scala.language.reflectiveCalls
 
@@ -38,6 +37,8 @@ class BuggyDAGEventProcessActor extends Actor {
   }
 }
 
+class DAGSchedulerSuiteDummyException extends Exception
+
 class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
   with ImplicitSender with BeforeAndAfter with LocalSparkContext {
 
@@ -593,6 +594,59 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
     assertDataStructuresEmpty
   }
 
+  // TODO: Fix this and un-ignore the test.
+  ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
+    val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
+      override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
+      override def zero(initialValue: Int): Int = 0
+      override def addInPlace(r1: Int, r2: Int): Int = {
+        throw new DAGSchedulerSuiteDummyException
+      }
+    })
+
+    // Run this on executors
+    intercept[SparkDriverExecutionException] {
+      sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
+    }
+
+    // Run this within a local thread
+    intercept[SparkDriverExecutionException] {
+      sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
+    }
+
+    // Make sure we can still run local commands as well as cluster commands.
+    assert(sc.parallelize(1 to 10, 2).count() === 10)
+    assert(sc.parallelize(1 to 10, 2).first() === 1)
+  }
+
+  test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
+    val e1 = intercept[SparkDriverExecutionException] {
+      val rdd = sc.parallelize(1 to 10, 2)
+      sc.runJob[Int, Int](
+        rdd,
+        (context: TaskContext, iter: Iterator[Int]) => iter.size,
+        Seq(0),
+        allowLocal = true,
+        (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
+    }
+    assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+
+    val e2 = intercept[SparkDriverExecutionException] {
+      val rdd = sc.parallelize(1 to 10, 2)
+      sc.runJob[Int, Int](
+        rdd,
+        (context: TaskContext, iter: Iterator[Int]) => iter.size,
+        Seq(0, 1),
+        allowLocal = false,
+        (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
+    }
+    assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
+
+    // Make sure we can still run local commands as well as cluster commands.
+    assert(sc.parallelize(1 to 10, 2).count() === 10)
+    assert(sc.parallelize(1 to 10, 2).first() === 1)
+  }
+
   test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") {
     val actorSystem = ActorSystem("test")
     val supervisor = actorSystem.actorOf(
-- 
GitLab