diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3816b8c4a09aa040e9e8c6de94c3f81cfb0978e9..068b49bd5844b7ce91e6118a68882447ac3d7c84 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -594,11 +594,17 @@ class DAGSchedulerSuite * @param stageId - The current stageId * @param attemptIdx - The current attempt count */ - private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = { + private def completeNextResultStageWithSuccess( + stageId: Int, + attemptIdx: Int, + partitionToResult: Int => Int = _ => 42): Unit = { val stageAttempt = taskSets.last checkStageId(stageId, attemptIdx, stageAttempt) assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) - complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq) + val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) => + (Success, partitionToResult(idx)) + } + complete(stageAttempt, taskResults.toSeq) } /** @@ -1054,6 +1060,47 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + /** + * Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which + * requires regenerating some outputs of the shared dependency. One key aspect of this test is + * that the second job actually uses a different stage for the shared dependency (a "skipped" + * stage). + */ + test("shuffle fetch failure in a reused shuffle dependency") { + // Run the first job successfully, which creates one shuffle dependency + + val shuffleMapRdd = new MyRDD(sc, 2, Nil) + val shuffleDep = new ShuffleDependency(shuffleMapRdd, null) + val reduceRdd = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduceRdd, Array(0, 1)) + + completeShuffleMapStageSuccessfully(0, 0, 2) + completeNextResultStageWithSuccess(1, 0) + assert(results === Map(0 -> 42, 1 -> 42)) + assertDataStructuresEmpty() + + // submit another job w/ the shared dependency, and have a fetch failure + val reduce2 = new MyRDD(sc, 2, List(shuffleDep)) + submit(reduce2, Array(0, 1)) + // Note that the stage numbering here is only b/c the shared dependency produces a new, skipped + // stage. If instead it reused the existing stage, then this would be stage 2 + completeNextStageWithFetchFailure(3, 0, shuffleDep) + scheduler.resubmitFailedStages() + + // the scheduler now creates a new task set to regenerate the missing map output, but this time + // using a different stage, the "skipped" one + + // SPARK-9809 -- this stage is submitted without a task for each partition (because some of + // the shuffle map output is still available from stage 0); make sure we've still got internal + // accumulators setup + assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty) + completeShuffleMapStageSuccessfully(2, 0, 2) + completeNextResultStageWithSuccess(3, 1, idx => idx + 1234) + assert(results === Map(0 -> 1234, 1 -> 1235)) + + assertDataStructuresEmpty() + } + /** * This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we * have completions from both the first & second attempt of stage 1. So all the map output is