diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c82ae4baa363095c252ec54be9a4dafed4ba4a03..c912520fded3b429e9c86eb265aaff58eb9fe3bc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -50,6 +50,10 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
  * not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
  * a small number of times before cancelling the whole stage.
  *
+ * Here's a checklist to use when making or reviewing changes to this class:
+ *
+ *  - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
+ *    include the new structure. This will help to catch memory leaks.
  */
 private[spark]
 class DAGScheduler(
@@ -111,6 +115,8 @@ class DAGScheduler(
   //       stray messages to detect.
   private val failedEpoch = new HashMap[String, Long]
 
+  private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
+
   // A closure serializer that we reuse.
   // This is only safe because DAGScheduler runs in a single thread.
   private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
@@ -128,8 +134,6 @@ class DAGScheduler(
   private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
   taskScheduler.setDAGScheduler(this)
 
-  private val outputCommitCoordinator = env.outputCommitCoordinator
-
   // Called by TaskScheduler to report task's starting.
   def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     eventProcessLoop.post(BeginEvent(task, taskInfo))
@@ -710,9 +714,10 @@ class DAGScheduler(
       // cancelling the stages because if the DAG scheduler is stopped, the entire application
       // is in the process of getting stopped.
       val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
-      runningStages.foreach { stage =>
-        stage.latestInfo.stageFailed(stageFailedMessage)
-        listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+      // The `toArray` here is necessary so that we don't iterate over `runningStages` while
+      // mutating it.
+      runningStages.toArray.foreach { stage =>
+        markStageAsFinished(stage, Some(stageFailedMessage))
       }
       listenerBus.post(SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobFailed(error)))
     }
@@ -887,10 +892,9 @@ class DAGScheduler(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
     } else {
-      // Because we posted SparkListenerStageSubmitted earlier, we should post
-      // SparkListenerStageCompleted here in case there are no tasks to run.
-      outputCommitCoordinator.stageEnd(stage.id)
-      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+      // Because we posted SparkListenerStageSubmitted earlier, we should mark
+      // the stage as completed here in case there are no tasks to run
+      markStageAsFinished(stage, None)
 
       val debugString = stage match {
         case stage: ShuffleMapStage =>
@@ -902,7 +906,6 @@ class DAGScheduler(
           s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
       }
       logDebug(debugString)
-      runningStages -= stage
     }
   }
 
@@ -968,22 +971,6 @@ class DAGScheduler(
     }
 
     val stage = stageIdToStage(task.stageId)
-
-    def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
-      val serviceTime = stage.latestInfo.submissionTime match {
-        case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
-        case _ => "Unknown"
-      }
-      if (errorMessage.isEmpty) {
-        logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
-        stage.latestInfo.completionTime = Some(clock.getTimeMillis())
-      } else {
-        stage.latestInfo.stageFailed(errorMessage.get)
-        logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
-      }
-      listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
-      runningStages -= stage
-    }
     event.reason match {
       case Success =>
         listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
@@ -1099,7 +1086,6 @@ class DAGScheduler(
           logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
             s"due to a fetch failure from $mapStage (${mapStage.name})")
           markStageAsFinished(failedStage, Some(failureMessage))
-          runningStages -= failedStage
         }
 
         if (disallowStageRetryForTest) {
@@ -1215,6 +1201,26 @@ class DAGScheduler(
     submitWaitingStages()
   }
 
+  /**
+   * Marks a stage as finished and removes it from the list of running stages.
+   */
+  private def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None): Unit = {
+    val serviceTime = stage.latestInfo.submissionTime match {
+      case Some(t) => "%.03f".format((clock.getTimeMillis() - t) / 1000.0)
+      case _ => "Unknown"
+    }
+    if (errorMessage.isEmpty) {
+      logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
+      stage.latestInfo.completionTime = Some(clock.getTimeMillis())
+    } else {
+      stage.latestInfo.stageFailed(errorMessage.get)
+      logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
+    }
+    outputCommitCoordinator.stageEnd(stage.id)
+    listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+    runningStages -= stage
+  }
+
   /**
    * Aborts all jobs depending on a particular Stage. This is called in response to a task set
    * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
@@ -1264,8 +1270,7 @@ class DAGScheduler(
           if (runningStages.contains(stage)) {
             try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
               taskScheduler.cancelTasks(stageId, shouldInterruptThread)
-              stage.latestInfo.stageFailed(failureReason)
-              listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
+              markStageAsFinished(stage, Some(failureReason))
             } catch {
               case e: UnsupportedOperationException =>
                 logInfo(s"Could not cancel tasks for stage $stageId", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 9e29fd13821dce46b6558dc399a85fb98b7529f2..7c184b1dcb308faa2729db8d35ae409eaa7e67fd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -59,6 +59,13 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf) extends Logging {
   private val authorizedCommittersByStage: CommittersByStageMap = mutable.Map()
   private type CommittersByStageMap = mutable.Map[StageId, mutable.Map[PartitionId, TaskAttemptId]]
 
+  /**
+   * Returns whether the OutputCommitCoordinator's internal data structures are all empty.
+   */
+  def isEmpty: Boolean = {
+    authorizedCommittersByStage.isEmpty
+  }
+
   /**
    * Called by tasks to ask whether they can commit their output to HDFS.
    *
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 63360a0f189a3e4db284504044a103e971aea852..eb759f0807a174bed61753c7bcf047551b0d52b9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -783,6 +783,7 @@ class DAGSchedulerSuite extends FunSuiteLike  with BeforeAndAfter with LocalSpar
     assert(scheduler.runningStages.isEmpty)
     assert(scheduler.shuffleToMapStage.isEmpty)
     assert(scheduler.waitingStages.isEmpty)
+    assert(scheduler.outputCommitCoordinator.isEmpty)
   }
 
   // Nothing in this test should break if the task info's fields are null, but