Skip to content
Snippets Groups Projects
Commit 8c81068e authored by Kay Ousterhout's avatar Kay Ousterhout
Browse files

Fixed >100char lines in DAGScheduler.scala

parent 7be1e577
No related branches found
No related tags found
No related merge requests found
...@@ -152,7 +152,8 @@ class DAGScheduler( ...@@ -152,7 +152,8 @@ class DAGScheduler(
val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
val running = new HashSet[Stage] // Stages we are running right now val running = new HashSet[Stage] // Stages we are running right now
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage // Missing tasks from each stage
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits
val activeJobs = new HashSet[ActiveJob] val activeJobs = new HashSet[ActiveJob]
...@@ -239,7 +240,8 @@ class DAGScheduler( ...@@ -239,7 +240,8 @@ class DAGScheduler(
shuffleToMapStage.get(shuffleDep.shuffleId) match { shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage case Some(stage) => stage
case None => case None =>
val stage = newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId) val stage =
newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage shuffleToMapStage(shuffleDep.shuffleId) = stage
stage stage
} }
...@@ -248,7 +250,8 @@ class DAGScheduler( ...@@ -248,7 +250,8 @@ class DAGScheduler(
/** /**
* Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
* of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided * of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided
* jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage directly. * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
* directly.
*/ */
private def newStage( private def newStage(
rdd: RDD[_], rdd: RDD[_],
...@@ -358,7 +361,8 @@ class DAGScheduler( ...@@ -358,7 +361,8 @@ class DAGScheduler(
stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
val parents = getParentStages(s.rdd, jobId) val parents = getParentStages(s.rdd, jobId)
val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId))) val parentsWithoutThisJobId = parents.filter(
p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail) updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
} }
} }
...@@ -366,8 +370,9 @@ class DAGScheduler( ...@@ -366,8 +370,9 @@ class DAGScheduler(
} }
/** /**
* Removes job and any stages that are not needed by any other job. Returns the set of ids for stages that * Removes job and any stages that are not needed by any other job. Returns the set of ids for
* were removed. The associated tasks for those stages need to be cancelled if we got here via job cancellation. * stages that were removed. The associated tasks for those stages need to be cancelled if we
* got here via job cancellation.
*/ */
private def removeJobAndIndependentStages(jobId: Int): Set[Int] = { private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
val registeredStages = jobIdToStageIds(jobId) val registeredStages = jobIdToStageIds(jobId)
...@@ -378,7 +383,8 @@ class DAGScheduler( ...@@ -378,7 +383,8 @@ class DAGScheduler(
stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach { stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach {
case (stageId, jobSet) => case (stageId, jobSet) =>
if (!jobSet.contains(jobId)) { if (!jobSet.contains(jobId)) {
logError("Job %d not registered for stage %d even though that stage was registered for the job" logError(
"Job %d not registered for stage %d even though that stage was registered for the job"
.format(jobId, stageId)) .format(jobId, stageId))
} else { } else {
def removeStage(stageId: Int) { def removeStage(stageId: Int) {
...@@ -389,7 +395,8 @@ class DAGScheduler( ...@@ -389,7 +395,8 @@ class DAGScheduler(
running -= s running -= s
} }
stageToInfos -= s stageToInfos -= s
shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleToMapStage.remove) shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(
shuffleToMapStage.remove)
if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) { if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
logDebug("Removing pending status for stage %d".format(stageId)) logDebug("Removing pending status for stage %d".format(stageId))
} }
...@@ -407,7 +414,8 @@ class DAGScheduler( ...@@ -407,7 +414,8 @@ class DAGScheduler(
stageIdToStage -= stageId stageIdToStage -= stageId
stageIdToJobIds -= stageId stageIdToJobIds -= stageId
logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size)) logDebug("After removal of stage %d, remaining stages = %d"
.format(stageId, stageIdToStage.size))
} }
jobSet -= jobId jobSet -= jobId
...@@ -459,7 +467,8 @@ class DAGScheduler( ...@@ -459,7 +467,8 @@ class DAGScheduler(
assert(partitions.size > 0) assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties) eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter waiter
} }
...@@ -494,7 +503,8 @@ class DAGScheduler( ...@@ -494,7 +503,8 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray val partitions = (0 until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement() val jobId = nextJobId.getAndIncrement()
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties) eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
listener.awaitResult() // Will throw an exception if the job fails listener.awaitResult() // Will throw an exception if the job fails
} }
...@@ -529,8 +539,8 @@ class DAGScheduler( ...@@ -529,8 +539,8 @@ class DAGScheduler(
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
var finalStage: Stage = null var finalStage: Stage = null
try { try {
// New stage creation at times and if its not protected, the scheduler thread is killed. // New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD
// e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted // whose underlying HDFS files have been deleted.
finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
} catch { } catch {
case e: Exception => case e: Exception =>
...@@ -563,7 +573,8 @@ class DAGScheduler( ...@@ -563,7 +573,8 @@ class DAGScheduler(
case JobGroupCancelled(groupId) => case JobGroupCancelled(groupId) =>
// Cancel all jobs belonging to this job group. // Cancel all jobs belonging to this job group.
// First finds all active jobs with this group id, and then kill stages for them. // First finds all active jobs with this group id, and then kill stages for them.
val activeInGroup = activeJobs.filter(groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID)) val activeInGroup = activeJobs.filter(
groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId) val jobIds = activeInGroup.map(_.jobId)
jobIds.foreach { handleJobCancellation } jobIds.foreach { handleJobCancellation }
...@@ -585,7 +596,8 @@ class DAGScheduler( ...@@ -585,7 +596,8 @@ class DAGScheduler(
stage <- stageIdToStage.get(task.stageId); stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage) stageInfo <- stageToInfos.get(stage)
) { ) {
if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && !stageInfo.emittedTaskSizeWarning) { if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
!stageInfo.emittedTaskSizeWarning) {
stageInfo.emittedTaskSizeWarning = true stageInfo.emittedTaskSizeWarning = true
logWarning(("Stage %d (%s) contains a task of very large " + logWarning(("Stage %d (%s) contains a task of very large " +
"size (%d KB). The maximum recommended task size is %d KB.").format( "size (%d KB). The maximum recommended task size is %d KB.").format(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment