diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c735220da2e153b0b3abbc5c1f4859c8eea92453..8eaf9dfcf49b1e18c935a5ddb312961e82064528 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1569,24 +1569,45 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     assertDataStructuresEmpty()
   }
 
-  test("run trivial shuffle with out-of-band failure and retry") {
+  /**
+   * In this test, we run a map stage where one of the executors fails but we still receive a
+   * "zombie" complete message from a task that ran on that executor. We want to make sure the
+   * stage is resubmitted so that the task that ran on the failed executor is re-executed, and
+   * that the stage is only marked as finished once that task completes.
+   */
+  test("run trivial shuffle with out-of-band executor failure and retry") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
     val shuffleId = shuffleDep.shuffleId
     val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
     submit(reduceRdd, Array(0))
-    // blockManagerMaster.removeExecutor("exec-hostA")
-    // pretend we were told hostA went away
+    // Tell the DAGScheduler that hostA was lost.
     runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
-    // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
-    // rather than marking it is as failed and waiting.
     complete(taskSets(0), Seq(
       (Success, makeMapStatus("hostA", 1)),
       (Success, makeMapStatus("hostB", 1))))
+
+    // At this point, no more tasks are running for the stage (and the TaskSetManager considers the
+    // stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler
+    // should re-submit the stage with one task (the task that originally ran on HostA).
+    assert(taskSets.size === 2)
+    assert(taskSets(1).tasks.size === 1)
+
+    // Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce
+    // stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on
+    // alive executors).
+    assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask])
+
     // have hostC complete the resubmitted task
     complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
     assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
       HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
+
+    // Make sure that the reduce stage was now submitted.
+    assert(taskSets.size === 3)
+    assert(taskSets(2).tasks(0).isInstanceOf[ResultTask[_, _]])
+
+    // Complete the reduce stage.
     complete(taskSets(2), Seq((Success, 42)))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty()
@@ -2031,6 +2052,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
    * In this test, we run a map stage where one of the executors fails but we still receive a
    * "zombie" complete message from that executor. We want to make sure the stage is not reported
    * as done until all tasks have completed.
+   *
+   * Most of the functionality in this test is tested in "run trivial shuffle with out-of-band
+   * executor failure and retry".  However, that test uses ShuffleMapStages that are followed by
+   * a ResultStage, whereas in this test, the ShuffleMapStage is tested in isolation, without a
+   * ResultStage after it.
    */
   test("map stage submission with executor failure late map task completions") {
     val shuffleMapRdd = new MyRDD(sc, 3, Nil)
@@ -2042,7 +2068,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2)))
     assert(results.size === 0)    // Map stage job should not be complete yet
 
-    // Pretend host A was lost
+    // Pretend host A was lost. This will cause the TaskSetManager to resubmit task 0, because it
+    // completed on hostA.
     val oldEpoch = mapOutputTracker.getEpoch
     runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
     val newEpoch = mapOutputTracker.getEpoch
@@ -2054,13 +2081,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
 
     // A completion from another task should work because it's a non-failed host
     runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2)))
-    assert(results.size === 0)    // Map stage job should not be complete yet
+
+    // At this point, no more tasks are running for the stage (and the TaskSetManager considers
+    // the stage complete), but the task that ran on hostA needs to be re-run, so the map stage
+    // shouldn't be marked as complete, and the DAGScheduler should re-submit the stage.
+    assert(results.size === 0)
+    assert(taskSets.size === 2)
 
     // Now complete tasks in the second task set
     val newTaskSet = taskSets(1)
-    assert(newTaskSet.tasks.size === 2)     // Both tasks 0 and 1 were on hostA
+    // 2 tasks should have been re-submitted, for tasks 0 and 1 (which ran on hostA).
+    assert(newTaskSet.tasks.size === 2)
+    // Complete task 0 from the original task set (i.e., not hte one that's currently active).
+    // This should still be counted towards the job being complete (but there's still one
+    // outstanding task).
     runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
-    assert(results.size === 0)    // Map stage job should not be complete yet
+    assert(results.size === 0)
+
+    // Complete the final task, from the currently active task set.  There's still one
+    // running task, task 0 in the currently active stage attempt, but the success of task 0 means
+    // the DAGScheduler can mark the stage as finished.
     runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
     assert(results.size === 1)    // Map stage job should now finally be complete
     assertDataStructuresEmpty()