diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3c9a66e5044035a837af32cf59917e31111472c7..394228b2728d12678db00ab2440b940c5db5a874 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -944,7 +944,7 @@ class DAGScheduler(
   private def submitMissingTasks(stage: Stage, jobId: Int) {
     logDebug("submitMissingTasks(" + stage + ")")
     // Get our pending tasks and remember them in our pendingTasks entry
-    stage.pendingTasks.clear()
+    stage.pendingPartitions.clear()
 
     // First figure out the indexes of partition ids to compute.
     val (allPartitions: Seq[Int], partitionsToCompute: Seq[Int]) = {
@@ -1060,8 +1060,8 @@ class DAGScheduler(
 
     if (tasks.size > 0) {
       logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
-      stage.pendingTasks ++= tasks
-      logDebug("New pending tasks: " + stage.pendingTasks)
+      stage.pendingPartitions ++= tasks.map(_.partitionId)
+      logDebug("New pending partitions: " + stage.pendingPartitions)
       taskScheduler.submitTasks(new TaskSet(
         tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
@@ -1152,7 +1152,7 @@ class DAGScheduler(
       case Success =>
         listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
           event.reason, event.taskInfo, event.taskMetrics))
-        stage.pendingTasks -= task
+        stage.pendingPartitions -= task.partitionId
         task match {
           case rt: ResultTask[_, _] =>
             // Cast to ResultStage here because it's part of the ResultTask
@@ -1198,7 +1198,7 @@ class DAGScheduler(
               shuffleStage.addOutputLoc(smt.partitionId, status)
             }
 
-            if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
+            if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
               markStageAsFinished(shuffleStage)
               logInfo("looking for newly runnable stages")
               logInfo("running: " + runningStages)
@@ -1242,7 +1242,7 @@ class DAGScheduler(
 
       case Resubmitted =>
         logInfo("Resubmitted " + task + ", so marking it as still running")
-        stage.pendingTasks += task
+        stage.pendingPartitions += task.partitionId
 
       case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) =>
         val failedStage = stageIdToStage(task.stageId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index b37eccbd0f7b85692659e555c85dd46fe410b02c..a3829c319c48dc7d5bb6367d83745e43ceed24de 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -66,7 +66,7 @@ private[scheduler] abstract class Stage(
   /** Set of jobs that this stage belongs to. */
   val jobIds = new HashSet[Int]
 
-  var pendingTasks = new HashSet[Task[_]]
+  val pendingPartitions = new HashSet[Int]
 
   /** The ID to use for the next new attempt for this stage. */
   private var nextAttemptId: Int = 0
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 62af9031b9f8b2d820bf7f0ec93cfccad637f31f..c02597c4365c96461f88c2277cb87bc2c93d488c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -487,8 +487,8 @@ private[spark] class TaskSetManager(
           // a good proxy to task serialization time.
           // val timeTaken = clock.getTime() - startTime
           val taskName = s"task ${info.id} in stage ${taskSet.id}"
-          logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
-              taskName, taskId, host, taskLocality, serializedTask.limit))
+          logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
+            s"$taskLocality, ${serializedTask.limit} bytes)")
 
           sched.dagScheduler.taskStarted(task, info)
           return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 1c55f90ad9b44be120df5b3065249aba684dd7f4..6b5bcf0574de63855d4b0aa5113ff058f93c9ca5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -479,8 +479,8 @@ class DAGSchedulerSuite
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     complete(taskSets(0), Seq(
-        (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
-        (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+        (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+        (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
     // the 2nd ResultTask failed
     complete(taskSets(1), Seq(
         (Success, 42),
@@ -490,7 +490,7 @@ class DAGSchedulerSuite
     // ask the scheduler to try it again
     scheduler.resubmitFailedStages()
     // have the 2nd attempt pass
-    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
+    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
     // we can see both result blocks now
     assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
       HashSet("hostA", "hostB"))
@@ -782,8 +782,8 @@ class DAGSchedulerSuite
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     complete(taskSets(0), Seq(
-      (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
-      (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+      (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
     // The MapOutputTracker should know about both map output locations.
     assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
       HashSet("hostA", "hostB"))
@@ -1035,6 +1035,173 @@ class DAGSchedulerSuite
     assertDataStructuresEmpty()
   }
 
+  /**
+   * This test runs a three stage job, with a fetch failure in stage 1.  but during the retry, we
+   * have completions from both the first & second attempt of stage 1.  So all the map output is
+   * available before we finish any task set for stage 1.  We want to make sure that we don't
+   * submit stage 2 until the map output for stage 1 is registered
+   */
+  test("don't submit stage until its dependencies map outputs are registered (SPARK-5259)") {
+    val firstRDD = new MyRDD(sc, 3, Nil)
+    val firstShuffleDep = new ShuffleDependency(firstRDD, null)
+    val firstShuffleId = firstShuffleDep.shuffleId
+    val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+    submit(reduceRdd, Array(0))
+
+    // things start out smoothly, stage 0 completes with no issues
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
+      (Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
+    ))
+
+    // then one executor dies, and a task fails in stage 1
+    runEvent(ExecutorLost("exec-hostA"))
+    runEvent(CompletionEvent(
+      taskSets(1).tasks(0),
+      FetchFailed(null, firstShuffleId, 2, 0, "Fetch failed"),
+      null,
+      null,
+      createFakeTaskInfo(),
+      null))
+
+    // so we resubmit stage 0, which completes happily
+    scheduler.resubmitFailedStages()
+    val stage0Resubmit = taskSets(2)
+    assert(stage0Resubmit.stageId == 0)
+    assert(stage0Resubmit.stageAttemptId === 1)
+    val task = stage0Resubmit.tasks(0)
+    assert(task.partitionId === 2)
+    runEvent(CompletionEvent(
+      task,
+      Success,
+      makeMapStatus("hostC", shuffleMapRdd.partitions.length),
+      null,
+      createFakeTaskInfo(),
+      null))
+
+    // now here is where things get tricky : we will now have a task set representing
+    // the second attempt for stage 1, but we *also* have some tasks for the first attempt for
+    // stage 1 still going
+    val stage1Resubmit = taskSets(3)
+    assert(stage1Resubmit.stageId == 1)
+    assert(stage1Resubmit.stageAttemptId === 1)
+    assert(stage1Resubmit.tasks.length === 3)
+
+    // we'll have some tasks finish from the first attempt, and some finish from the second attempt,
+    // so that we actually have all stage outputs, though no attempt has completed all its
+    // tasks
+    runEvent(CompletionEvent(
+      taskSets(3).tasks(0),
+      Success,
+      makeMapStatus("hostC", reduceRdd.partitions.length),
+      null,
+      createFakeTaskInfo(),
+      null))
+    runEvent(CompletionEvent(
+      taskSets(3).tasks(1),
+      Success,
+      makeMapStatus("hostC", reduceRdd.partitions.length),
+      null,
+      createFakeTaskInfo(),
+      null))
+    // late task finish from the first attempt
+    runEvent(CompletionEvent(
+      taskSets(1).tasks(2),
+      Success,
+      makeMapStatus("hostB", reduceRdd.partitions.length),
+      null,
+      createFakeTaskInfo(),
+      null))
+
+    // What should happen now is that we submit stage 2.  However, we might not see an error
+    // b/c of DAGScheduler's error handling (it tends to swallow errors and just log them).  But
+    // we can check some conditions.
+    // Note that the really important thing here is not so much that we submit stage 2 *immediately*
+    // but that we don't end up with some error from these interleaved completions.  It would also
+    // be OK (though sub-optimal) if stage 2 simply waited until the resubmission of stage 1 had
+    // all its tasks complete
+
+    // check that we have all the map output for stage 0 (it should have been there even before
+    // the last round of completions from stage 1, but just to double check it hasn't been messed
+    // up) and also the newly available stage 1
+    val stageToReduceIdxs = Seq(
+      0 -> (0 until 3),
+      1 -> (0 until 1)
+    )
+    for {
+      (stage, reduceIdxs) <- stageToReduceIdxs
+      reduceIdx <- reduceIdxs
+    } {
+      // this would throw an exception if the map status hadn't been registered
+      val statuses = mapOutputTracker.getMapSizesByExecutorId(stage, reduceIdx)
+      // really we should have already thrown an exception rather than fail either of these
+      // asserts, but just to be extra defensive let's double check the statuses are OK
+      assert(statuses != null)
+      assert(statuses.nonEmpty)
+    }
+
+    // and check that stage 2 has been submitted
+    assert(taskSets.size == 5)
+    val stage2TaskSet = taskSets(4)
+    assert(stage2TaskSet.stageId == 2)
+    assert(stage2TaskSet.stageAttemptId == 0)
+  }
+
+  /**
+   * We lose an executor after completing some shuffle map tasks on it.  Those tasks get
+   * resubmitted, and when they finish the job completes normally
+   */
+  test("register map outputs correctly after ExecutorLost and task Resubmitted") {
+    val firstRDD = new MyRDD(sc, 3, Nil)
+    val firstShuffleDep = new ShuffleDependency(firstRDD, null)
+    val reduceRdd = new MyRDD(sc, 5, List(firstShuffleDep))
+    submit(reduceRdd, Array(0))
+
+    // complete some of the tasks from the first stage, on one host
+    runEvent(CompletionEvent(
+      taskSets(0).tasks(0), Success,
+      makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(
+      taskSets(0).tasks(1), Success,
+      makeMapStatus("hostA", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
+
+    // now that host goes down
+    runEvent(ExecutorLost("exec-hostA"))
+
+    // so we resubmit those tasks
+    runEvent(CompletionEvent(
+      taskSets(0).tasks(0), Resubmitted, null, null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(
+      taskSets(0).tasks(1), Resubmitted, null, null, createFakeTaskInfo(), null))
+
+    // now complete everything on a different host
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))
+    ))
+
+    // now we should submit stage 1, and the map output from stage 0 should be registered
+
+    // check that we have all the map output for stage 0
+    (0 until reduceRdd.partitions.length).foreach { reduceIdx =>
+      val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
+      // really we should have already thrown an exception rather than fail either of these
+      // asserts, but just to be extra defensive let's double check the statuses are OK
+      assert(statuses != null)
+      assert(statuses.nonEmpty)
+    }
+
+    // and check that stage 1 has been submitted
+    assert(taskSets.size == 2)
+    val stage1TaskSet = taskSets(1)
+    assert(stage1TaskSet.stageId == 1)
+    assert(stage1TaskSet.stageAttemptId == 0)
+  }
+
   /**
    * Makes sure that failures of stage used by multiple jobs are correctly handled.
    *
@@ -1393,8 +1560,8 @@ class DAGSchedulerSuite
     // Submit a map stage by itself
     submitMapStage(shuffleDep)
     complete(taskSets(0), Seq(
-      (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
-      (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
+      (Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
     assert(results.size === 1)
     results.clear()
     assertDataStructuresEmpty()
@@ -1407,7 +1574,7 @@ class DAGSchedulerSuite
     // Ask the scheduler to try it again; TaskSet 2 will rerun the map task that we couldn't fetch
     // from, then TaskSet 3 will run the reduce stage
     scheduler.resubmitFailedStages()
-    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
+    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
     complete(taskSets(3), Seq((Success, 43)))
     assert(results === Map(0 -> 42, 1 -> 43))
     results.clear()
@@ -1452,8 +1619,8 @@ class DAGSchedulerSuite
     // Complete the first stage
     assert(taskSets(0).stageId === 0)
     complete(taskSets(0), Seq(
-      (Success, makeMapStatus("hostA", rdd1.partitions.size)),
-      (Success, makeMapStatus("hostB", rdd1.partitions.size))))
+      (Success, makeMapStatus("hostA", rdd1.partitions.length)),
+      (Success, makeMapStatus("hostB", rdd1.partitions.length))))
     assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
       HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
     assert(listener1.results.size === 1)
@@ -1461,7 +1628,7 @@ class DAGSchedulerSuite
     // When attempting the second stage, show a fetch failure
     assert(taskSets(1).stageId === 1)
     complete(taskSets(1), Seq(
-      (Success, makeMapStatus("hostA", rdd2.partitions.size)),
+      (Success, makeMapStatus("hostA", rdd2.partitions.length)),
       (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null)))
     scheduler.resubmitFailedStages()
     assert(listener2.results.size === 0)    // Second stage listener should not have a result yet
@@ -1469,7 +1636,7 @@ class DAGSchedulerSuite
     // Stage 0 should now be running as task set 2; make its task succeed
     assert(taskSets(2).stageId === 0)
     complete(taskSets(2), Seq(
-      (Success, makeMapStatus("hostC", rdd2.partitions.size))))
+      (Success, makeMapStatus("hostC", rdd2.partitions.length))))
     assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet ===
       HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
     assert(listener2.results.size === 0)    // Second stage listener should still not have a result
@@ -1477,8 +1644,8 @@ class DAGSchedulerSuite
     // Stage 1 should now be running as task set 3; make its first task succeed
     assert(taskSets(3).stageId === 1)
     complete(taskSets(3), Seq(
-      (Success, makeMapStatus("hostB", rdd2.partitions.size)),
-      (Success, makeMapStatus("hostD", rdd2.partitions.size))))
+      (Success, makeMapStatus("hostB", rdd2.partitions.length)),
+      (Success, makeMapStatus("hostD", rdd2.partitions.length))))
     assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet ===
       HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD")))
     assert(listener2.results.size === 1)
@@ -1494,7 +1661,7 @@ class DAGSchedulerSuite
     // TaskSet 5 will rerun stage 1's lost task, then TaskSet 6 will rerun stage 2
     assert(taskSets(5).stageId === 1)
     complete(taskSets(5), Seq(
-      (Success, makeMapStatus("hostE", rdd2.partitions.size))))
+      (Success, makeMapStatus("hostE", rdd2.partitions.length))))
     complete(taskSets(6), Seq(
       (Success, 53)))
     assert(listener3.results === Map(0 -> 52, 1 -> 53))