From ac7fc3075b7323261346fb4cd38c26f3b8f08bc2 Mon Sep 17 00:00:00 2001
From: jinxing <jinxing6042@126.com>
Date: Wed, 31 May 2017 10:46:23 -0500
Subject: [PATCH] [SPARK-20288] Avoid generating the MapStatus by stageId in
 BasicSchedulerIntegrationSuite

## What changes were proposed in this pull request?

ShuffleId is determined before job submitted. But it's hard to predict stageId by shuffleId.
Stage is created in DAGScheduler(
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L381), but the order is n
ot determined in `HashSet`.
I added a log(println(s"Creating ShufflMapStage-$id on shuffle-${shuffleDep.shuffleId}")) after (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L331), when testing BasicSchedulerIntegrationSuite:"multi-stage job". It will print:
Creating ShufflMapStage-0 on shuffle-0
Creating ShufflMapStage-1 on shuffle-2
Creating ShufflMapStage-2 on shuffle-1
Creating ShufflMapStage-3 on shuffle-3
or
Creating ShufflMapStage-0 on shuffle-1
Creating ShufflMapStage-1 on shuffle-3
Creating ShufflMapStage-2 on shuffle-0
Creating ShufflMapStage-3 on shuffle-2
It might be better to avoid generating the MapStatus by stageId.

Author: jinxing <jinxing6042@126.com>

Closes #17603 from jinxing64/SPARK-20288.
---
 .../spark/scheduler/SchedulerIntegrationSuite.scala   | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 37b08980db..a8249e123f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -553,10 +553,10 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
    */
   testScheduler("multi-stage job") {
 
-    def stageToOutputParts(stageId: Int): Int = {
-      stageId match {
+    def shuffleIdToOutputParts(shuffleId: Int): Int = {
+      shuffleId match {
         case 0 => 10
-        case 2 => 20
+        case 1 => 20
         case _ => 30
       }
     }
@@ -577,11 +577,12 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
         // b/c the stage numbering is non-deterministic, so stage number alone doesn't tell
         // us what to check
       }
-
       (task.stageId, task.stageAttemptId, task.partitionId) match {
         case (stage, 0, _) if stage < 4 =>
+          val shuffleId =
+            scheduler.stageIdToStage(stage).asInstanceOf[ShuffleMapStage].shuffleDep.shuffleId
           backend.taskSuccess(taskDescription,
-            DAGSchedulerSuite.makeMapStatus("hostA", stageToOutputParts(stage)))
+            DAGSchedulerSuite.makeMapStatus("hostA", shuffleIdToOutputParts(shuffleId)))
         case (4, 0, partition) =>
           backend.taskSuccess(taskDescription, 4321 + partition)
       }
-- 
GitLab