Skip to content
Snippets Groups Projects
Commit 0b5abbf5 authored by Josh Rosen's avatar Josh Rosen
Browse files

[SPARK-8606] Prevent exceptions in RDD.getPreferredLocations() from crashing DAGScheduler

If `RDD.getPreferredLocations()` throws an exception it may crash the DAGScheduler and SparkContext. This patch addresses this by adding a try-catch block.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #7023 from JoshRosen/SPARK-8606 and squashes the following commits:

770b169 [Josh Rosen] Fix getPreferredLocations() DAGScheduler crash with try block.
44a9b55 [Josh Rosen] Add test of a buggy getPartitions() method
19aa9f7 [Josh Rosen] Add (failing) regression test for getPreferredLocations() DAGScheduler crash
parent 4153776f
No related branches found
No related tags found
No related merge requests found
......@@ -907,22 +907,29 @@ class DAGScheduler(
return
}
val tasks: Seq[Task[_]] = stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}
case stage: ResultStage =>
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
new ResultTask(stage.id, taskBinary, part, locs, id)
}
case stage: ResultStage =>
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
new ResultTask(stage.id, taskBinary, part, locs, id)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")
runningStages -= stage
return
}
if (tasks.size > 0) {
......
......@@ -784,6 +784,37 @@ class DAGSchedulerSuite
assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {
val e1 = intercept[DAGSchedulerSuiteDummyException] {
val rdd = new MyRDD(sc, 2, Nil) {
override def getPartitions: Array[Partition] = {
throw new DAGSchedulerSuiteDummyException
}
}
rdd.reduceByKey(_ + _, 1).count()
}
// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("getPreferredLocations errors should not crash DAGScheduler and SparkContext (SPARK-8606)") {
val e1 = intercept[SparkException] {
val rdd = new MyRDD(sc, 2, Nil) {
override def getPreferredLocations(split: Partition): Seq[String] = {
throw new DAGSchedulerSuiteDummyException
}
}
rdd.count()
}
assert(e1.getMessage.contains(classOf[DAGSchedulerSuiteDummyException].getName))
// Make sure we can still run local commands as well as cluster commands.
assert(sc.parallelize(1 to 10, 2).count() === 10)
assert(sc.parallelize(1 to 10, 2).first() === 1)
}
test("accumulator not calculated for resubmitted result stage") {
// just for register
val accum = new Accumulator[Int](0, AccumulatorParam.IntAccumulatorParam)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment