Skip to content
Snippets Groups Projects
Commit 33112f9c authored by Imran Rashid's avatar Imran Rashid Committed by Andrew Or
Browse files

[SPARK-10192][CORE] simple test w/ failure involving a shared dependency

just trying to increase test coverage in the scheduler, this already works.  It includes a regression test for SPARK-9809

copied some test utils from https://github.com/apache/spark/pull/5636, we can wait till that is merged first

Author: Imran Rashid <irashid@cloudera.com>

Closes #8402 from squito/test_retry_in_shared_shuffle_dep.
parent c0e48dfa
No related branches found
No related tags found
No related merge requests found
...@@ -594,11 +594,17 @@ class DAGSchedulerSuite ...@@ -594,11 +594,17 @@ class DAGSchedulerSuite
* @param stageId - The current stageId * @param stageId - The current stageId
* @param attemptIdx - The current attempt count * @param attemptIdx - The current attempt count
*/ */
private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = { private def completeNextResultStageWithSuccess(
stageId: Int,
attemptIdx: Int,
partitionToResult: Int => Int = _ => 42): Unit = {
val stageAttempt = taskSets.last val stageAttempt = taskSets.last
checkStageId(stageId, attemptIdx, stageAttempt) checkStageId(stageId, attemptIdx, stageAttempt)
assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage]) assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq) val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
(Success, partitionToResult(idx))
}
complete(stageAttempt, taskResults.toSeq)
} }
/** /**
...@@ -1054,6 +1060,47 @@ class DAGSchedulerSuite ...@@ -1054,6 +1060,47 @@ class DAGSchedulerSuite
assertDataStructuresEmpty() assertDataStructuresEmpty()
} }
/**
* Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which
* requires regenerating some outputs of the shared dependency. One key aspect of this test is
* that the second job actually uses a different stage for the shared dependency (a "skipped"
* stage).
*/
test("shuffle fetch failure in a reused shuffle dependency") {
// Run the first job successfully, which creates one shuffle dependency
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
completeShuffleMapStageSuccessfully(0, 0, 2)
completeNextResultStageWithSuccess(1, 0)
assert(results === Map(0 -> 42, 1 -> 42))
assertDataStructuresEmpty()
// submit another job w/ the shared dependency, and have a fetch failure
val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
submit(reduce2, Array(0, 1))
// Note that the stage numbering here is only b/c the shared dependency produces a new, skipped
// stage. If instead it reused the existing stage, then this would be stage 2
completeNextStageWithFetchFailure(3, 0, shuffleDep)
scheduler.resubmitFailedStages()
// the scheduler now creates a new task set to regenerate the missing map output, but this time
// using a different stage, the "skipped" one
// SPARK-9809 -- this stage is submitted without a task for each partition (because some of
// the shuffle map output is still available from stage 0); make sure we've still got internal
// accumulators setup
assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
completeShuffleMapStageSuccessfully(2, 0, 2)
completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
assert(results === Map(0 -> 1234, 1 -> 1235))
assertDataStructuresEmpty()
}
/** /**
* This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we * This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we
* have completions from both the first & second attempt of stage 1. So all the map output is * have completions from both the first & second attempt of stage 1. So all the map output is
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment