diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 963d15b76d4c4d7495eed8e7fd01d86588c67334..c48a3d64ef72096611550bf2f84983a50967c011 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -152,7 +152,8 @@ class DAGScheduler(
   val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
   val running = new HashSet[Stage] // Stages we are running right now
   val failed = new HashSet[Stage]  // Stages that must be resubmitted due to fetch failures
-  val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] // Missing tasks from each stage
+  // Missing tasks from each stage
+  val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
   var lastFetchFailureTime: Long = 0  // Used to wait a bit to avoid repeated resubmits
 
   val activeJobs = new HashSet[ActiveJob]
@@ -239,7 +240,8 @@ class DAGScheduler(
     shuffleToMapStage.get(shuffleDep.shuffleId) match {
       case Some(stage) => stage
       case None =>
-        val stage = newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
+        val stage =
+          newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
         shuffleToMapStage(shuffleDep.shuffleId) = stage
         stage
     }
@@ -248,7 +250,8 @@ class DAGScheduler(
   /**
    * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
    * of a shuffle map stage in newOrUsedStage.  The stage will be associated with the provided
-   * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage directly.
+   * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
+   * directly.
    */
   private def newStage(
       rdd: RDD[_],
@@ -358,7 +361,8 @@ class DAGScheduler(
         stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
         jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
         val parents = getParentStages(s.rdd, jobId)
-        val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
+        val parentsWithoutThisJobId = parents.filter(p =>
+          !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
         updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
       }
     }
@@ -366,8 +370,9 @@ class DAGScheduler(
   }
 
   /**
-   * Removes job and any stages that are not needed by any other job.  Returns the set of ids for stages that
-   * were removed.  The associated tasks for those stages need to be cancelled if we got here via job cancellation.
+   * Removes job and any stages that are not needed by any other job.  Returns the set of ids for
+   * stages that were removed.  The associated tasks for those stages need to be cancelled if we
+   * got here via job cancellation.
    */
   private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
     val registeredStages = jobIdToStageIds(jobId)
@@ -378,7 +383,8 @@ class DAGScheduler(
       stageIdToJobIds.filterKeys(stageId => registeredStages.contains(stageId)).foreach {
         case (stageId, jobSet) =>
           if (!jobSet.contains(jobId)) {
-            logError("Job %d not registered for stage %d even though that stage was registered for the job"
+            logError(
+              "Job %d not registered for stage %d even though that stage was registered for the job"
               .format(jobId, stageId))
           } else {
             def removeStage(stageId: Int) {
@@ -389,7 +395,8 @@ class DAGScheduler(
                   running -= s
                 }
                 stageToInfos -= s
-                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleToMapStage.remove)
+                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
+                  shuffleToMapStage.remove(shuffleId))
                 if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
                   logDebug("Removing pending status for stage %d".format(stageId))
                 }
@@ -407,7 +414,8 @@ class DAGScheduler(
               stageIdToStage -= stageId
               stageIdToJobIds -= stageId
 
-              logDebug("After removal of stage %d, remaining stages = %d".format(stageId, stageIdToStage.size))
+              logDebug("After removal of stage %d, remaining stages = %d"
+                .format(stageId, stageIdToStage.size))
             }
 
             jobSet -= jobId
@@ -459,7 +467,8 @@ class DAGScheduler(
     assert(partitions.size > 0)
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
-    eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
+    eventProcessActor ! JobSubmitted(
+      jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
     waiter
   }
 
@@ -494,7 +503,8 @@ class DAGScheduler(
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     val partitions = (0 until rdd.partitions.size).toArray
     val jobId = nextJobId.getAndIncrement()
-    eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
+    eventProcessActor ! JobSubmitted(
+      jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
     listener.awaitResult()    // Will throw an exception if the job fails
   }
 
@@ -529,8 +539,8 @@ class DAGScheduler(
       case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
         var finalStage: Stage = null
         try {
-          // New stage creation at times and if its not protected, the scheduler thread is killed. 
-          // e.g. it can fail when jobs are run on HadoopRDD whose underlying hdfs files have been deleted
+          // New stage creation may throw an exception if, for example, jobs are run on a HadoopRDD
+          // whose underlying HDFS files have been deleted.
           finalStage = newStage(rdd, partitions.size, None, jobId, Some(callSite))
         } catch {
           case e: Exception =>
@@ -563,7 +573,8 @@ class DAGScheduler(
       case JobGroupCancelled(groupId) =>
         // Cancel all jobs belonging to this job group.
         // First finds all active jobs with this group id, and then kill stages for them.
-        val activeInGroup = activeJobs.filter(groupId == _.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
+        val activeInGroup = activeJobs.filter(activeJob =>
+          groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
         val jobIds = activeInGroup.map(_.jobId)
         jobIds.foreach { handleJobCancellation }
 
@@ -585,7 +596,8 @@ class DAGScheduler(
           stage <- stageIdToStage.get(task.stageId);
           stageInfo <- stageToInfos.get(stage)
         ) {
-          if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && !stageInfo.emittedTaskSizeWarning) {
+          if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 &&
+              !stageInfo.emittedTaskSizeWarning) {
             stageInfo.emittedTaskSizeWarning = true
             logWarning(("Stage %d (%s) contains a task of very large " +
               "size (%d KB). The maximum recommended task size is %d KB.").format(