Skip to content
Snippets Groups Projects
Commit 3fb57a0a authored by Reynold Xin's avatar Reynold Xin Committed by Matei Zaharia
Browse files

[SPARK-3353] parent stage should have lower stage id.

Previously parent stages had higher stage id, but parent stages are executed first. This pull request changes the behavior so parent stages would have lower stage id.

For example, command:
```scala
sc.parallelize(1 to 10).map(x=>(x,x)).reduceByKey(_+_).count
```
breaks down into 2 stages.

The old web UI:
![screen shot 2014-09-04 at 12 42 44 am](https://cloud.githubusercontent.com/assets/323388/4146177/60fb4f42-3407-11e4-819f-853eb0e22b25.png)

Web UI with this patch:
![screen shot 2014-09-04 at 12 44 55 am](https://cloud.githubusercontent.com/assets/323388/4146178/62e08e62-3407-11e4-867b-a36b10534464.png)

Author: Reynold Xin <rxin@apache.org>

Closes #2273 from rxin/lower-stage-id and squashes the following commits:

abbb4c6 [Reynold Xin] Fixed SparkListenerSuite.
0e02379 [Reynold Xin] Updated DAGSchedulerSuite.
54ccea3 [Reynold Xin] [SPARK-3353] parent stage should have lower stage id.
parent 110fb8b2
No related branches found
No related tags found
No related merge requests found
...@@ -241,9 +241,9 @@ class DAGScheduler( ...@@ -241,9 +241,9 @@ class DAGScheduler(
callSite: CallSite) callSite: CallSite)
: Stage = : Stage =
{ {
val parentStages = getParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement() val id = nextStageId.getAndIncrement()
val stage = val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage) updateJobIdStageIdMaps(jobId, stage)
stage stage
......
...@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Timeouts ...@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._ import org.scalatest.time.SpanSugar._
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
...@@ -97,10 +98,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F ...@@ -97,10 +98,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
/** Length of time to wait while draining listener events. */ /** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000 val WAIT_TIMEOUT_MILLIS = 10000
val sparkListener = new SparkListener() { val sparkListener = new SparkListener() {
val successfulStages = new HashSet[Int]() val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]() val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) { override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageInfo = stageCompleted.stageInfo val stageInfo = stageCompleted.stageInfo
stageByOrderOfExecution += stageInfo.stageId
if (stageInfo.failureReason.isEmpty) { if (stageInfo.failureReason.isEmpty) {
successfulStages += stageInfo.stageId successfulStages += stageInfo.stageId
} else { } else {
...@@ -231,6 +234,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F ...@@ -231,6 +234,13 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
runEvent(JobCancelled(jobId)) runEvent(JobCancelled(jobId))
} }
test("[SPARK-3353] parent stage should have lower stage id") {
sparkListener.stageByOrderOfExecution.clear()
sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
assert(sparkListener.stageByOrderOfExecution.length === 2)
assert(sparkListener.stageByOrderOfExecution(0) < sparkListener.stageByOrderOfExecution(1))
}
test("zero split job") { test("zero split job") {
var numResults = 0 var numResults = 0
val fakeListener = new JobListener() { val fakeListener = new JobListener() {
...@@ -457,7 +467,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F ...@@ -457,7 +467,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
null, null,
null)) null))
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(0)) assert(sparkListener.failedStages.contains(1))
// The second ResultTask fails, with a fetch failure for the output from the second mapper. // The second ResultTask fails, with a fetch failure for the output from the second mapper.
runEvent(CompletionEvent( runEvent(CompletionEvent(
...@@ -515,8 +525,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F ...@@ -515,8 +525,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
// Listener bus should get told about the map stage failing, but not the reduce stage // Listener bus should get told about the map stage failing, but not the reduce stage
// (since the reduce stage hasn't been started yet). // (since the reduce stage hasn't been started yet).
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.failedStages.contains(1)) assert(sparkListener.failedStages.toSet === Set(0))
assert(sparkListener.failedStages.size === 1)
assertDataStructuresEmpty assertDataStructuresEmpty
} }
...@@ -563,14 +572,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F ...@@ -563,14 +572,12 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
val stageFailureMessage = "Exception failure in map stage" val stageFailureMessage = "Exception failure in map stage"
failed(taskSets(0), stageFailureMessage) failed(taskSets(0), stageFailureMessage)
assert(cancelledStages.contains(1)) assert(cancelledStages.toSet === Set(0, 2))
// Make sure the listeners got told about both failed stages. // Make sure the listeners got told about both failed stages.
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(sparkListener.successfulStages.isEmpty) assert(sparkListener.successfulStages.isEmpty)
assert(sparkListener.failedStages.contains(1)) assert(sparkListener.failedStages.toSet === Set(0, 2))
assert(sparkListener.failedStages.contains(3))
assert(sparkListener.failedStages.size === 2)
assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage") assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
......
...@@ -180,7 +180,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers ...@@ -180,7 +180,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
rdd3.count() rdd3.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {2} // Shuffle map stage + result stage listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 3).get
stageInfo3.rddInfos.size should be {1} // ShuffledRDD stageInfo3.rddInfos.size should be {1} // ShuffledRDD
stageInfo3.rddInfos.forall(_.numPartitions == 4) should be {true} stageInfo3.rddInfos.forall(_.numPartitions == 4) should be {true}
stageInfo3.rddInfos.exists(_.name == "Trois") should be {true} stageInfo3.rddInfos.exists(_.name == "Trois") should be {true}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment