diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b9d7e1328d149deff90e3dd528ba34c19eb2d36d..69101acb3ac8bffd87b1498079d57ad4b5e81346 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -932,8 +932,6 @@ class DAGScheduler(
   /** Called when stage's parents are available and we can now do its task. */
   private def submitMissingTasks(stage: Stage, jobId: Int) {
     logDebug("submitMissingTasks(" + stage + ")")
-    // Get our pending tasks and remember them in our pendingTasks entry
-    stage.pendingPartitions.clear()
 
     // First figure out the indexes of partition ids to compute.
     val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
@@ -1013,9 +1011,11 @@ class DAGScheduler(
       val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
       stage match {
         case stage: ShuffleMapStage =>
+          stage.pendingPartitions.clear()
           partitionsToCompute.map { id =>
             val locs = taskIdToLocations(id)
             val part = stage.rdd.partitions(id)
+            stage.pendingPartitions += id
             new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
               taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
               Option(sc.applicationId), sc.applicationAttemptId)
@@ -1039,9 +1039,8 @@ class DAGScheduler(
     }
 
     if (tasks.size > 0) {
-      logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
-      stage.pendingPartitions ++= tasks.map(_.partitionId)
-      logDebug("New pending partitions: " + stage.pendingPartitions)
+      logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
+        s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
       taskScheduler.submitTasks(new TaskSet(
         tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
@@ -1147,7 +1146,6 @@ class DAGScheduler(
     val stage = stageIdToStage(task.stageId)
     event.reason match {
       case Success =>
-        stage.pendingPartitions -= task.partitionId
         task match {
           case rt: ResultTask[_, _] =>
             // Cast to ResultStage here because it's part of the ResultTask
@@ -1183,6 +1181,7 @@ class DAGScheduler(
 
           case smt: ShuffleMapTask =>
             val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
+            shuffleStage.pendingPartitions -= task.partitionId
             updateAccumulators(event)
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
@@ -1235,7 +1234,14 @@ class DAGScheduler(
 
       case Resubmitted =>
         logInfo("Resubmitted " + task + ", so marking it as still running")
-        stage.pendingPartitions += task.partitionId
+        stage match {
+          case sms: ShuffleMapStage =>
+            sms.pendingPartitions += task.partitionId
+
+          case _ =>
+            assert(false, "TaskSetManagers should only send Resubmitted task statuses for " +
+              "tasks in ShuffleMapStages.")
+        }
 
       case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
         val failedStage = stageIdToStage(task.stageId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
index 51416e5ce97fc2086969548bfc06c9954e2755ec..db4d9efa2270c80182c6482d1d0cb01536f945cb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import scala.collection.mutable.HashSet
+
 import org.apache.spark.ShuffleDependency
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.BlockManagerId
@@ -47,6 +49,17 @@ private[spark] class ShuffleMapStage(
 
   private[this] var _numAvailableOutputs: Int = 0
 
+  /**
+   * Partitions that either haven't yet been computed, or that were computed on an executor
+   * that has since been lost, so should be re-computed.  This variable is used by the
+   * DAGScheduler to determine when a stage has completed. Task successes in both the active
+   * attempt for the stage or in earlier attempts for this stage can cause paritition ids to get
+   * removed from pendingPartitions. As a result, this variable may be inconsistent with the pending
+   * tasks in the TaskSetManager for the active attempt for the stage (the partitions stored here
+   * will always be a subset of the partitions that the TaskSetManager thinks are pending).
+   */
+  val pendingPartitions = new HashSet[Int]
+
   /**
    * List of [[MapStatus]] for each partition. The index of the array is the map partition id,
    * and each value in the array is the list of possible [[MapStatus]] for a partition
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c628dd38d07bbe93ff45e570354a156ed023276b..c6fc03812921eb0468a9c44db4fab0b228dde56f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -67,8 +67,6 @@ private[scheduler] abstract class Stage(
   /** Set of jobs that this stage belongs to. */
   val jobIds = new HashSet[Int]
 
-  val pendingPartitions = new HashSet[Int]
-
   /** The ID to use for the next new attempt for this stage. */
   private var nextAttemptId: Int = 0