Skip to content
Snippets Groups Projects
Commit 3d3f8c80 authored by CrazyJvm's avatar CrazyJvm Committed by Reynold Xin
Browse files

Use pluggable clock in DAGSheduler #SPARK-2031

DAGScheduler supports pluggable clock like what TaskSetManager does.

Author: CrazyJvm <crazyjvm@gmail.com>

Closes #976 from CrazyJvm/clock and squashes the following commits:

6779a4c [CrazyJvm] Use pluggable clock in DAGSheduler
parent c7a183b2
No related branches found
No related tags found
No related merge requests found
...@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics ...@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId} import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
import org.apache.spark.util.Utils import org.apache.spark.util.{SystemClock, Clock, Utils}
/** /**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
...@@ -61,7 +61,8 @@ class DAGScheduler( ...@@ -61,7 +61,8 @@ class DAGScheduler(
listenerBus: LiveListenerBus, listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster, mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster, blockManagerMaster: BlockManagerMaster,
env: SparkEnv) env: SparkEnv,
clock: Clock = SystemClock)
extends Logging { extends Logging {
import DAGScheduler._ import DAGScheduler._
...@@ -781,7 +782,7 @@ class DAGScheduler( ...@@ -781,7 +782,7 @@ class DAGScheduler(
logDebug("New pending tasks: " + myPending) logDebug("New pending tasks: " + myPending)
taskScheduler.submitTasks( taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis()) stageToInfos(stage).submissionTime = Some(clock.getTime())
} else { } else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format( logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
...@@ -807,11 +808,11 @@ class DAGScheduler( ...@@ -807,11 +808,11 @@ class DAGScheduler(
def markStageAsFinished(stage: Stage) = { def markStageAsFinished(stage: Stage) = {
val serviceTime = stageToInfos(stage).submissionTime match { val serviceTime = stageToInfos(stage).submissionTime match {
case Some(t) => "%.03f".format((System.currentTimeMillis() - t) / 1000.0) case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
case _ => "Unknown" case _ => "Unknown"
} }
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime)) logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stageToInfos(stage).completionTime = Some(System.currentTimeMillis()) stageToInfos(stage).completionTime = Some(clock.getTime())
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage))) listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
runningStages -= stage runningStages -= stage
} }
...@@ -1015,7 +1016,7 @@ class DAGScheduler( ...@@ -1015,7 +1016,7 @@ class DAGScheduler(
return return
} }
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
stageToInfos(failedStage).completionTime = Some(System.currentTimeMillis()) stageToInfos(failedStage).completionTime = Some(clock.getTime())
for (resultStage <- dependentStages) { for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage) val job = resultStageToJob(resultStage)
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason", failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment