diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index e63feb8893b63aa3cc3493de5b5332343a87dce7..19ebaf817e24ec8d27842792a8541ddf16eb65f9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -874,7 +874,8 @@ private[spark] class TaskSetManager(
     // and we are not using an external shuffle server which could serve the shuffle outputs.
     // The reason is the next stage wouldn't be able to fetch the data from this dead executor
     // so we would need to rerun these tasks on other executors.
-    if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled) {
+    if (tasks(0).isInstanceOf[ShuffleMapTask] && !env.blockManager.externalShuffleServiceEnabled
+        && !isZombie) {
       for ((tid, info) <- taskInfos if info.executorId == execId) {
         val index = taskInfos(tid).index
         if (successful(index)) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index d03a0c990a02b21f7fbe682ed32e16fed54c3b19..2c2cda9f318eb094271845e429f49b6f14ca357f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler
 
-import java.util.Random
+import java.util.{Properties, Random}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, never, spy, verify, when}
 import org.apache.spark._
 import org.apache.spark.internal.config
 import org.apache.spark.internal.Logging
+import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.{AccumulatorV2, ManualClock}
 
@@ -664,6 +665,67 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     assert(thrown2.getMessage().contains("bigger than spark.driver.maxResultSize"))
   }
 
+  test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie") {
+    val conf = new SparkConf().set("spark.speculation", "true")
+    sc = new SparkContext("local", "test", conf)
+
+    val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
+    sched.initialize(new FakeSchedulerBackend() {
+      override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {}
+    })
+
+    // Keep track of the number of tasks that are resubmitted,
+    // so that the test can check that no tasks were resubmitted.
+    var resubmittedTasks = 0
+    val dagScheduler = new FakeDAGScheduler(sc, sched) {
+      override def taskEnded(
+          task: Task[_],
+          reason: TaskEndReason,
+          result: Any,
+          accumUpdates: Seq[AccumulatorV2[_, _]],
+          taskInfo: TaskInfo): Unit = {
+        super.taskEnded(task, reason, result, accumUpdates, taskInfo)
+        reason match {
+          case Resubmitted => resubmittedTasks += 1
+          case _ =>
+        }
+      }
+    }
+    sched.setDAGScheduler(dagScheduler)
+
+    val singleTask = new ShuffleMapTask(0, 0, null, new Partition {
+        override def index: Int = 0
+      }, Seq(TaskLocation("host1", "execA")), new Properties, null)
+    val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, null)
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
+
+    // Offer host1, which should be accepted as a PROCESS_LOCAL location
+    // by the one task in the task set
+    val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get
+
+    // Mark the task as available for speculation, and then offer another resource,
+    // which should be used to launch a speculative copy of the task.
+    manager.speculatableTasks += singleTask.partitionId
+    val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get
+
+    assert(manager.runningTasks === 2)
+    assert(manager.isZombie === false)
+
+    val directTaskResult = new DirectTaskResult[String](null, Seq()) {
+      override def value(resultSer: SerializerInstance): String = ""
+    }
+    // Complete one copy of the task, which should result in the task set manager
+    // being marked as a zombie, because at least one copy of its only task has completed.
+    manager.handleSuccessfulTask(task1.taskId, directTaskResult)
+    assert(manager.isZombie === true)
+    assert(resubmittedTasks === 0)
+    assert(manager.runningTasks === 1)
+
+    manager.executorLost("execB", "host2", new SlaveLost())
+    assert(manager.runningTasks === 0)
+    assert(resubmittedTasks === 0)
+  }
+
   test("speculative and noPref task should be scheduled after node-local") {
     sc = new SparkContext("local", "test")
     sched = new FakeTaskScheduler(