diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index acb4c4946eded18c29498a962357b7f9c209d664..00b8af27a7b39b682631f729cf4c5599138754e3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -710,7 +710,6 @@ class DAGScheduler( if (missing == Nil) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) - runningStages += stage } else { for (parent <- missing) { submitStage(parent) @@ -753,11 +752,14 @@ class DAGScheduler( null } - // must be run listener before possible NotSerializableException - // should be "StageSubmitted" first and then "JobEnded" - listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties)) - if (tasks.size > 0) { + runningStages += stage + // SparkListenerStageSubmitted should be posted before testing whether tasks are + // serializable. If tasks are not serializable, a SparkListenerStageCompleted event + // will be posted, which should always come after a corresponding SparkListenerStageSubmitted + // event. + listenerBus.post(SparkListenerStageSubmitted(stageToInfos(stage), properties)) + // Preemptively serialize a task to make sure it can be serialized. We are catching this // exception here because it would be fairly hard to catch the non-serializable exception // down the road, where we have several different implementations for local scheduler and diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9f498d579a0956734da261d48b205c18d4f6a1eb..44dd1e092ad67c5bd8732bbc6bbda0426b7a0e65 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -37,6 +37,29 @@ class BuggyDAGEventProcessActor extends Actor { } } +/** + * An RDD for passing to DAGScheduler. These RDDs will use the dependencies and + * preferredLocations (if any) that are passed to them. They are deliberately not executable + * so we can test that DAGScheduler does not try to execute RDDs locally. + */ +class MyRDD( + sc: SparkContext, + numPartitions: Int, + dependencies: List[Dependency[_]], + locations: Seq[Seq[String]] = Nil) extends RDD[(Int, Int)](sc, dependencies) with Serializable { + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = + throw new RuntimeException("should not be reached") + override def getPartitions = (0 until numPartitions).map(i => new Partition { + override def index = i + }).toArray + override def getPreferredLocations(split: Partition): Seq[String] = + if (locations.isDefinedAt(split.index)) + locations(split.index) + else + Nil + override def toString: String = "DAGSchedulerSuiteRDD " + id +} + class DAGSchedulerSuiteDummyException extends Exception class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with FunSuiteLike @@ -148,34 +171,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F * Type of RDD we use for testing. Note that we should never call the real RDD compute methods. * This is a pair RDD type so it can always be used in ShuffleDependencies. */ - type MyRDD = RDD[(Int, Int)] - - /** - * Create an RDD for passing to DAGScheduler. These RDDs will use the dependencies and - * preferredLocations (if any) that are passed to them. They are deliberately not executable - * so we can test that DAGScheduler does not try to execute RDDs locally. - */ - private def makeRdd( - numPartitions: Int, - dependencies: List[Dependency[_]], - locations: Seq[Seq[String]] = Nil - ): MyRDD = { - val maxPartition = numPartitions - 1 - val newRDD = new MyRDD(sc, dependencies) { - override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = - throw new RuntimeException("should not be reached") - override def getPartitions = (0 to maxPartition).map(i => new Partition { - override def index = i - }).toArray - override def getPreferredLocations(split: Partition): Seq[String] = - if (locations.isDefinedAt(split.index)) - locations(split.index) - else - Nil - override def toString: String = "DAGSchedulerSuiteRDD " + id - } - newRDD - } + type PairOfIntsRDD = RDD[(Int, Int)] /** * Process the supplied event as if it were the top of the DAGScheduler event queue, expecting @@ -234,19 +230,19 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F override def taskSucceeded(partition: Int, value: Any) = numResults += 1 override def jobFailed(exception: Exception) = throw exception } - submit(makeRdd(0, Nil), Array(), listener = fakeListener) + submit(new MyRDD(sc, 0, Nil), Array(), listener = fakeListener) assert(numResults === 0) } test("run trivial job") { - submit(makeRdd(1, Nil), Array(0)) + submit(new MyRDD(sc, 1, Nil), Array(0)) complete(taskSets(0), List((Success, 42))) assert(results === Map(0 -> 42)) assertDataStructuresEmpty } test("local job") { - val rdd = new MyRDD(sc, Nil) { + val rdd = new PairOfIntsRDD(sc, Nil) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = Array(42 -> 0).iterator override def getPartitions = Array( new Partition { override def index = 0 } ) @@ -260,7 +256,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("local job oom") { - val rdd = new MyRDD(sc, Nil) { + val rdd = new PairOfIntsRDD(sc, Nil) { override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new java.lang.OutOfMemoryError("test local job oom") override def getPartitions = Array( new Partition { override def index = 0 } ) @@ -274,8 +270,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("run trivial job w/ dependency") { - val baseRdd = makeRdd(1, Nil) - val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) + val baseRdd = new MyRDD(sc, 1, Nil) + val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) submit(finalRdd, Array(0)) complete(taskSets(0), Seq((Success, 42))) assert(results === Map(0 -> 42)) @@ -283,8 +279,8 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("cache location preferences w/ dependency") { - val baseRdd = makeRdd(1, Nil) - val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) + val baseRdd = new MyRDD(sc, 1, Nil) + val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) cacheLocations(baseRdd.id -> 0) = Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) submit(finalRdd, Array(0)) @@ -295,8 +291,22 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F assertDataStructuresEmpty } + test("unserializable task") { + val unserializableRdd = new MyRDD(sc, 1, Nil) { + class UnserializableClass + val unserializable = new UnserializableClass + } + submit(unserializableRdd, Array(0)) + assert(failure.getMessage.startsWith( + "Job aborted due to stage failure: Task not serializable:")) + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + assert(sparkListener.failedStages.contains(0)) + assert(sparkListener.failedStages.size === 1) + assertDataStructuresEmpty + } + test("trivial job failure") { - submit(makeRdd(1, Nil), Array(0)) + submit(new MyRDD(sc, 1, Nil), Array(0)) failed(taskSets(0), "some failure") assert(failure.getMessage === "Job aborted due to stage failure: some failure") assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) @@ -306,7 +316,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("trivial job cancellation") { - val rdd = makeRdd(1, Nil) + val rdd = new MyRDD(sc, 1, Nil) val jobId = submit(rdd, Array(0)) cancel(jobId) assert(failure.getMessage === s"Job $jobId cancelled ") @@ -347,8 +357,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } dagEventProcessTestActor = TestActorRef[DAGSchedulerEventProcessActor]( Props(classOf[DAGSchedulerEventProcessActor], noKillScheduler))(system) - val rdd = makeRdd(1, Nil) - val jobId = submit(rdd, Array(0)) + val jobId = submit(new MyRDD(sc, 1, Nil), Array(0)) cancel(jobId) // Because the job wasn't actually cancelled, we shouldn't have received a failure message. assert(failure === null) @@ -364,10 +373,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("run trivial shuffle") { - val shuffleMapRdd = makeRdd(2, Nil) + val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId - val reduceRdd = makeRdd(1, List(shuffleDep)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), @@ -380,10 +389,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("run trivial shuffle with fetch failure") { - val shuffleMapRdd = makeRdd(2, Nil) + val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId - val reduceRdd = makeRdd(2, List(shuffleDep)) + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), @@ -406,10 +415,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("ignore late map task completions") { - val shuffleMapRdd = makeRdd(2, Nil) + val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId - val reduceRdd = makeRdd(2, List(shuffleDep)) + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) // pretend we were told hostA went away val oldEpoch = mapOutputTracker.getEpoch @@ -435,9 +444,9 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("run shuffle with map stage failure") { - val shuffleMapRdd = makeRdd(2, Nil) + val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) - val reduceRdd = makeRdd(2, List(shuffleDep)) + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) // Fail the map stage. This should cause the entire job to fail. @@ -472,13 +481,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F * without shuffleMapRdd1. */ test("failure of stage used by two jobs") { - val shuffleMapRdd1 = makeRdd(2, Nil) + val shuffleMapRdd1 = new MyRDD(sc, 2, Nil) val shuffleDep1 = new ShuffleDependency(shuffleMapRdd1, null) - val shuffleMapRdd2 = makeRdd(2, Nil) + val shuffleMapRdd2 = new MyRDD(sc, 2, Nil) val shuffleDep2 = new ShuffleDependency(shuffleMapRdd2, null) - val reduceRdd1 = makeRdd(2, List(shuffleDep1)) - val reduceRdd2 = makeRdd(2, List(shuffleDep1, shuffleDep2)) + val reduceRdd1 = new MyRDD(sc, 2, List(shuffleDep1)) + val reduceRdd2 = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep2)) // We need to make our own listeners for this test, since by default submit uses the same // listener for all jobs, and here we want to capture the failure for each job separately. @@ -511,10 +520,10 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("run trivial shuffle with out-of-band failure and retry") { - val shuffleMapRdd = makeRdd(2, Nil) + val shuffleMapRdd = new MyRDD(sc, 2, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) val shuffleId = shuffleDep.shuffleId - val reduceRdd = makeRdd(1, List(shuffleDep)) + val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) // blockManagerMaster.removeExecutor("exec-hostA") // pretend we were told hostA went away @@ -534,11 +543,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("recursive shuffle failures") { - val shuffleOneRdd = makeRdd(2, Nil) + val shuffleOneRdd = new MyRDD(sc, 2, Nil) val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) - val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)) val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) - val finalRdd = makeRdd(1, List(shuffleDepTwo)) + val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo)) submit(finalRdd, Array(0)) // have the first stage complete normally complete(taskSets(0), Seq( @@ -563,11 +572,11 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F } test("cached post-shuffle") { - val shuffleOneRdd = makeRdd(2, Nil) + val shuffleOneRdd = new MyRDD(sc, 2, Nil) val shuffleDepOne = new ShuffleDependency(shuffleOneRdd, null) - val shuffleTwoRdd = makeRdd(2, List(shuffleDepOne)) + val shuffleTwoRdd = new MyRDD(sc, 2, List(shuffleDepOne)) val shuffleDepTwo = new ShuffleDependency(shuffleTwoRdd, null) - val finalRdd = makeRdd(1, List(shuffleDepTwo)) + val finalRdd = new MyRDD(sc, 1, List(shuffleDepTwo)) submit(finalRdd, Array(0)) cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))