From 33112f9c48680c33d663978f76806ebf0ea39789 Mon Sep 17 00:00:00 2001
From: Imran Rashid <irashid@cloudera.com>
Date: Tue, 10 Nov 2015 16:50:22 -0800
Subject: [PATCH] [SPARK-10192][CORE] simple test w/ failure involving a shared
 dependency

just trying to increase test coverage in the scheduler, this already works.  It includes a regression test for SPARK-9809

copied some test utils from https://github.com/apache/spark/pull/5636, we can wait till that is merged first

Author: Imran Rashid <irashid@cloudera.com>

Closes #8402 from squito/test_retry_in_shared_shuffle_dep.
---
 .../spark/scheduler/DAGSchedulerSuite.scala   | 51 ++++++++++++++++++-
 1 file changed, 49 insertions(+), 2 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3816b8c4a0..068b49bd58 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -594,11 +594,17 @@ class DAGSchedulerSuite
    * @param stageId - The current stageId
    * @param attemptIdx - The current attempt count
    */
-  private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = {
+  private def completeNextResultStageWithSuccess(
+      stageId: Int,
+      attemptIdx: Int,
+      partitionToResult: Int => Int = _ => 42): Unit = {
     val stageAttempt = taskSets.last
     checkStageId(stageId, attemptIdx, stageAttempt)
     assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
-    complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq)
+    val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
+      (Success, partitionToResult(idx))
+    }
+    complete(stageAttempt, taskResults.toSeq)
   }
 
   /**
@@ -1054,6 +1060,47 @@ class DAGSchedulerSuite
     assertDataStructuresEmpty()
   }
 
+  /**
+   * Run two jobs, with a shared dependency.  We simulate a fetch failure in the second job, which
+   * requires regenerating some outputs of the shared dependency.  One key aspect of this test is
+   * that the second job actually uses a different stage for the shared dependency (a "skipped"
+   * stage).
+   */
+  test("shuffle fetch failure in a reused shuffle dependency") {
+    // Run the first job successfully, which creates one shuffle dependency
+
+    val shuffleMapRdd = new MyRDD(sc, 2, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduceRdd, Array(0, 1))
+
+    completeShuffleMapStageSuccessfully(0, 0, 2)
+    completeNextResultStageWithSuccess(1, 0)
+    assert(results === Map(0 -> 42, 1 -> 42))
+    assertDataStructuresEmpty()
+
+    // submit another job w/ the shared dependency, and have a fetch failure
+    val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
+    submit(reduce2, Array(0, 1))
+    // Note that the stage numbering here is only b/c the shared dependency produces a new, skipped
+    // stage.  If instead it reused the existing stage, then this would be stage 2
+    completeNextStageWithFetchFailure(3, 0, shuffleDep)
+    scheduler.resubmitFailedStages()
+
+    // the scheduler now creates a new task set to regenerate the missing map output, but this time
+    // using a different stage, the "skipped" one
+
+    // SPARK-9809 -- this stage is submitted without a task for each partition (because some of
+    // the shuffle map output is still available from stage 0); make sure we've still got internal
+    // accumulators setup
+    assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
+    completeShuffleMapStageSuccessfully(2, 0, 2)
+    completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
+    assert(results === Map(0 -> 1234, 1 -> 1235))
+
+    assertDataStructuresEmpty()
+  }
+
   /**
    * This test runs a three stage job, with a fetch failure in stage 1.  but during the retry, we
    * have completions from both the first & second attempt of stage 1.  So all the map output is
-- 
GitLab