Skip to content
Snippets Groups Projects
Commit 358ae153 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-2322] Exception in resultHandler should NOT crash DAGScheduler and shutdown SparkContext.

This should go into 1.0.1.

Author: Reynold Xin <rxin@apache.org>

Closes #1264 from rxin/SPARK-2322 and squashes the following commits:

c77c07f [Reynold Xin] Added comment to SparkDriverExecutionException and a test case for accumulator.
5d8d920 [Reynold Xin] [SPARK-2322] Exception in resultHandler could crash DAGScheduler and shutdown SparkContext.
parent 68036422
No related branches found
No related tags found
No related merge requests found
......@@ -22,3 +22,11 @@ class SparkException(message: String, cause: Throwable)
def this(message: String) = this(message, null)
}
/**
* Exception thrown when execution of some user code in the driver process fails, e.g.
* accumulator update fails or failure in takeOrdered (user supplies an Ordering implementation
* that can be misbehaving.
*/
private[spark] class SparkDriverExecutionException(cause: Throwable)
extends SparkException("Execution error", cause)
......@@ -581,8 +581,9 @@ class DAGScheduler(
}
} catch {
case e: Exception =>
jobResult = JobFailed(e)
job.listener.jobFailed(e)
val exception = new SparkDriverExecutionException(e)
jobResult = JobFailed(exception)
job.listener.jobFailed(exception)
case oom: OutOfMemoryError =>
val exception = new SparkException("Local job aborted due to out of memory error", oom)
jobResult = JobFailed(exception)
......@@ -822,6 +823,7 @@ class DAGScheduler(
case Success =>
logInfo("Completed " + task)
if (event.accumUpdates != null) {
// TODO: fail the stage if the accumulator update fails...
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
}
pendingTasks(stage) -= task
......@@ -838,7 +840,16 @@ class DAGScheduler(
cleanupStateForJobAndIndependentStages(job, Some(stage))
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
}
job.listener.taskSucceeded(rt.outputId, event.result)
// taskSucceeded runs some user code that might throw an exception. Make sure
// we are resilient against that.
try {
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Exception =>
// TODO: Perhaps we want to mark the stage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
}
case None =>
logInfo("Ignoring result from " + rt + " because its job has finished")
......@@ -1161,8 +1172,7 @@ private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler)
override val supervisorStrategy =
OneForOneStrategy() {
case x: Exception =>
logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
.format(x.getMessage))
logError("eventProcesserActor failed; shutting down SparkContext", x)
try {
dagScheduler.doCancelAllJobs()
} catch {
......
......@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
import scala.Tuple2
import scala.collection.mutable.{HashSet, HashMap, Map}
import scala.language.reflectiveCalls
......@@ -38,6 +37,8 @@ class BuggyDAGEventProcessActor extends Actor {
}
}
class DAGSchedulerSuiteDummyException extends Exception
class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike
with ImplicitSender with BeforeAndAfter with LocalSparkContext {
......@@ -593,6 +594,59 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
assertDataStructuresEmpty
}
// TODO: Fix this and un-ignore the test.
ignore("misbehaved accumulator should not crash DAGScheduler and SparkContext") {
val acc = new Accumulator[Int](0, new AccumulatorParam[Int] {
override def addAccumulator(t1: Int, t2: Int): Int = t1 + t2
override def zero(initialValue: Int): Int = 0
override def addInPlace(r1: Int, r2: Int): Int = {
throw new DAGSchedulerSuiteDummyException
}
})
// Run this on executors
intercept[SparkDriverExecutionException] {
sc.parallelize(1 to 10, 2).foreach { item => acc.add(1) }
}
// Run this within a local thread
intercept[SparkDriverExecutionException] {
sc.parallelize(1 to 10, 2).map { item => acc.add(1) }.take(1)
}
// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
val e1 = intercept[SparkDriverExecutionException] {
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
(context: TaskContext, iter: Iterator[Int]) => iter.size,
Seq(0),
allowLocal = true,
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
}
assert(e1.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
val e2 = intercept[SparkDriverExecutionException] {
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
(context: TaskContext, iter: Iterator[Int]) => iter.size,
Seq(0, 1),
allowLocal = false,
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
}
assert(e2.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("DAGSchedulerActorSupervisor closes the SparkContext when EventProcessActor crashes") {
val actorSystem = ActorSystem("test")
val supervisor = actorSystem.actorOf(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment