Skip to content
Snippets Groups Projects
Commit 698ef762 authored by Takuya UESHIN's avatar Takuya UESHIN Committed by Kay Ousterhout
Browse files

[SPARK-14269][SCHEDULER] Eliminate unnecessary submitStage() call.

## What changes were proposed in this pull request?

Currently a method `submitStage()` for waiting stages is called on every iteration of the event loop in `DAGScheduler` to submit all waiting stages, but most of them are not necessary because they are not related to Stage status.
The case we should try to submit waiting stages is only when their parent stages are successfully completed.

This elimination can improve `DAGScheduler` performance.

## How was this patch tested?

Added some checks and other existing tests, and our projects.

We have a project bottle-necked by `DAGScheduler`, having about 2000 stages.

Before this patch the almost all execution time in `Driver` process was spent to process `submitStage()` of `dag-scheduler-event-loop` thread but after this patch the performance was improved as follows:

|        | total execution time | `dag-scheduler-event-loop` thread time | `submitStage()` |
|--------|---------------------:|---------------------------------------:|----------------:|
| Before |              760 sec |                                710 sec |         667 sec |
| After  |              440 sec |                                 14 sec |          10 sec |

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #12060 from ueshin/issues/SPARK-14269.
parent c875d81a
No related branches found
No related tags found
No related merge requests found
...@@ -726,7 +726,6 @@ class DAGScheduler( ...@@ -726,7 +726,6 @@ class DAGScheduler(
reason = "as part of cancellation of all jobs")) reason = "as part of cancellation of all jobs"))
activeJobs.clear() // These should already be empty by this point, activeJobs.clear() // These should already be empty by this point,
jobIdToActiveJob.clear() // but just in case we lost track of some jobs... jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
submitWaitingStages()
} }
/** /**
...@@ -752,23 +751,21 @@ class DAGScheduler( ...@@ -752,23 +751,21 @@ class DAGScheduler(
submitStage(stage) submitStage(stage)
} }
} }
submitWaitingStages()
} }
/** /**
* Check for waiting stages which are now eligible for resubmission. * Check for waiting stages which are now eligible for resubmission.
* Ordinarily run on every iteration of the event loop. * Submits stages that depend on the given parent stage. Called when the parent stage completes
* successfully.
*/ */
private def submitWaitingStages() { private def submitWaitingChildStages(parent: Stage) {
// TODO: We might want to run this less often, when we are sure that something has become logTrace(s"Checking if any dependencies of $parent are now runnable")
// runnable that wasn't before.
logTrace("Checking for newly runnable parent stages")
logTrace("running: " + runningStages) logTrace("running: " + runningStages)
logTrace("waiting: " + waitingStages) logTrace("waiting: " + waitingStages)
logTrace("failed: " + failedStages) logTrace("failed: " + failedStages)
val waitingStagesCopy = waitingStages.toArray val childStages = waitingStages.filter(_.parents.contains(parent)).toArray
waitingStages.clear() waitingStages --= childStages
for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) { for (stage <- childStages.sortBy(_.firstJobId)) {
submitStage(stage) submitStage(stage)
} }
} }
...@@ -793,7 +790,6 @@ class DAGScheduler( ...@@ -793,7 +790,6 @@ class DAGScheduler(
} }
val jobIds = activeInGroup.map(_.jobId) val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId))) jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
submitWaitingStages()
} }
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) { private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
...@@ -801,7 +797,6 @@ class DAGScheduler( ...@@ -801,7 +797,6 @@ class DAGScheduler(
// In that case, we wouldn't have the stage anymore in stageIdToStage. // In that case, we wouldn't have the stage anymore in stageIdToStage.
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1) val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo)) listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
submitWaitingStages()
} }
private[scheduler] def handleTaskSetFailed( private[scheduler] def handleTaskSetFailed(
...@@ -809,7 +804,6 @@ class DAGScheduler( ...@@ -809,7 +804,6 @@ class DAGScheduler(
reason: String, reason: String,
exception: Option[Throwable]): Unit = { exception: Option[Throwable]): Unit = {
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) } stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason, exception) }
submitWaitingStages()
} }
private[scheduler] def cleanUpAfterSchedulerStop() { private[scheduler] def cleanUpAfterSchedulerStop() {
...@@ -832,7 +826,6 @@ class DAGScheduler( ...@@ -832,7 +826,6 @@ class DAGScheduler(
private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) { private[scheduler] def handleGetTaskResult(taskInfo: TaskInfo) {
listenerBus.post(SparkListenerTaskGettingResult(taskInfo)) listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
submitWaitingStages()
} }
private[scheduler] def handleJobSubmitted(jobId: Int, private[scheduler] def handleJobSubmitted(jobId: Int,
...@@ -871,8 +864,6 @@ class DAGScheduler( ...@@ -871,8 +864,6 @@ class DAGScheduler(
listenerBus.post( listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage) submitStage(finalStage)
submitWaitingStages()
} }
private[scheduler] def handleMapStageSubmitted(jobId: Int, private[scheduler] def handleMapStageSubmitted(jobId: Int,
...@@ -916,8 +907,6 @@ class DAGScheduler( ...@@ -916,8 +907,6 @@ class DAGScheduler(
if (finalStage.isAvailable) { if (finalStage.isAvailable) {
markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency)) markMapStageJobAsFinished(job, mapOutputTracker.getStatistics(dependency))
} }
submitWaitingStages()
} }
/** Submits stage, but first recursively submits any missing parents. */ /** Submits stage, but first recursively submits any missing parents. */
...@@ -1073,6 +1062,8 @@ class DAGScheduler( ...@@ -1073,6 +1062,8 @@ class DAGScheduler(
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})" s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
} }
logDebug(debugString) logDebug(debugString)
submitWaitingChildStages(stage)
} }
} }
...@@ -1238,9 +1229,8 @@ class DAGScheduler( ...@@ -1238,9 +1229,8 @@ class DAGScheduler(
markMapStageJobAsFinished(job, stats) markMapStageJobAsFinished(job, stats)
} }
} }
submitWaitingChildStages(shuffleStage)
} }
// Note: newly runnable stages will be submitted below when we submit waiting stages
} }
} }
...@@ -1315,7 +1305,6 @@ class DAGScheduler( ...@@ -1315,7 +1305,6 @@ class DAGScheduler(
// Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
// will abort the job. // will abort the job.
} }
submitWaitingStages()
} }
/** /**
...@@ -1357,7 +1346,6 @@ class DAGScheduler( ...@@ -1357,7 +1346,6 @@ class DAGScheduler(
logDebug("Additional executor lost message for " + execId + logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")") "(epoch " + currentEpoch + ")")
} }
submitWaitingStages()
} }
private[scheduler] def handleExecutorAdded(execId: String, host: String) { private[scheduler] def handleExecutorAdded(execId: String, host: String) {
...@@ -1366,7 +1354,6 @@ class DAGScheduler( ...@@ -1366,7 +1354,6 @@ class DAGScheduler(
logInfo("Host added was in lost list earlier: " + host) logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId failedEpoch -= execId
} }
submitWaitingStages()
} }
private[scheduler] def handleStageCancellation(stageId: Int) { private[scheduler] def handleStageCancellation(stageId: Int) {
...@@ -1379,7 +1366,6 @@ class DAGScheduler( ...@@ -1379,7 +1366,6 @@ class DAGScheduler(
case None => case None =>
logInfo("No active jobs to kill for Stage " + stageId) logInfo("No active jobs to kill for Stage " + stageId)
} }
submitWaitingStages()
} }
private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") { private[scheduler] def handleJobCancellation(jobId: Int, reason: String = "") {
...@@ -1389,7 +1375,6 @@ class DAGScheduler( ...@@ -1389,7 +1375,6 @@ class DAGScheduler(
failJobAndIndependentStages( failJobAndIndependentStages(
jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason)) jobIdToActiveJob(jobId), "Job %d cancelled %s".format(jobId, reason))
} }
submitWaitingStages()
} }
/** /**
......
...@@ -370,6 +370,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou ...@@ -370,6 +370,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(mapStageB.parents === List(mapStageA)) assert(mapStageB.parents === List(mapStageA))
assert(mapStageC.parents === List(mapStageA, mapStageB)) assert(mapStageC.parents === List(mapStageA, mapStageB))
assert(finalStage.parents === List(mapStageC)) assert(finalStage.parents === List(mapStageC))
complete(taskSets(0), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(1), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
complete(taskSets(3), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
} }
test("zero split job") { test("zero split job") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment