Skip to content
Snippets Groups Projects
Commit ac7fc307 authored by jinxing's avatar jinxing Committed by Imran Rashid
Browse files

[SPARK-20288] Avoid generating the MapStatus by stageId in BasicSchedulerIntegrationSuite

## What changes were proposed in this pull request?

ShuffleId is determined before job submitted. But it's hard to predict stageId by shuffleId.
Stage is created in DAGScheduler(
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L381), but the order is n
ot determined in `HashSet`.
I added a log(println(s"Creating ShufflMapStage-$id on shuffle-${shuffleDep.shuffleId}")) after (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L331), when testing BasicSchedulerIntegrationSuite:"multi-stage job". It will print:
Creating ShufflMapStage-0 on shuffle-0
Creating ShufflMapStage-1 on shuffle-2
Creating ShufflMapStage-2 on shuffle-1
Creating ShufflMapStage-3 on shuffle-3
or
Creating ShufflMapStage-0 on shuffle-1
Creating ShufflMapStage-1 on shuffle-3
Creating ShufflMapStage-2 on shuffle-0
Creating ShufflMapStage-3 on shuffle-2
It might be better to avoid generating the MapStatus by stageId.

Author: jinxing <jinxing6042@126.com>

Closes #17603 from jinxing64/SPARK-20288.
parent d52f6362
No related branches found
No related tags found
No related merge requests found
...@@ -553,10 +553,10 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor ...@@ -553,10 +553,10 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
*/ */
testScheduler("multi-stage job") { testScheduler("multi-stage job") {
def stageToOutputParts(stageId: Int): Int = { def shuffleIdToOutputParts(shuffleId: Int): Int = {
stageId match { shuffleId match {
case 0 => 10 case 0 => 10
case 2 => 20 case 1 => 20
case _ => 30 case _ => 30
} }
} }
...@@ -577,11 +577,12 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor ...@@ -577,11 +577,12 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
// b/c the stage numbering is non-deterministic, so stage number alone doesn't tell // b/c the stage numbering is non-deterministic, so stage number alone doesn't tell
// us what to check // us what to check
} }
(task.stageId, task.stageAttemptId, task.partitionId) match { (task.stageId, task.stageAttemptId, task.partitionId) match {
case (stage, 0, _) if stage < 4 => case (stage, 0, _) if stage < 4 =>
val shuffleId =
scheduler.stageIdToStage(stage).asInstanceOf[ShuffleMapStage].shuffleDep.shuffleId
backend.taskSuccess(taskDescription, backend.taskSuccess(taskDescription,
DAGSchedulerSuite.makeMapStatus("hostA", stageToOutputParts(stage))) DAGSchedulerSuite.makeMapStatus("hostA", shuffleIdToOutputParts(shuffleId)))
case (4, 0, partition) => case (4, 0, partition) =>
backend.taskSuccess(taskDescription, 4321 + partition) backend.taskSuccess(taskDescription, 4321 + partition)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment