diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index cf97877476d54e630dbf146bff825325d79148a2..05cfa52f16b3e820e4c72d5392b8269038584a3c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -114,9 +114,14 @@ private[spark] class TaskSetManager(
   // treated as stacks, in which new tasks are added to the end of the
   // ArrayBuffer and removed from the end. This makes it faster to detect
   // tasks that repeatedly fail because whenever a task failed, it is put
-  // back at the head of the stack. They are also only cleaned up lazily;
-  // when a task is launched, it remains in all the pending lists except
-  // the one that it was launched from, but gets removed from them later.
+  // back at the head of the stack. These collections may contain duplicates
+  // for two reasons:
+  // (1): Tasks are only removed lazily; when a task is launched, it remains
+  // in all the pending lists except the one that it was launched from.
+  // (2): Tasks may be re-added to these lists multiple times as a result
+  // of failures.
+  // Duplicates are handled in dequeueTaskFromList, which ensures that a
+  // task hasn't already started running before launching it.
   private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
 
   // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
@@ -179,23 +184,16 @@ private[spark] class TaskSetManager(
 
   /** Add a task to all the pending-task lists that it should be on. */
   private def addPendingTask(index: Int) {
-    // Utility method that adds `index` to a list only if it's not already there
-    def addTo(list: ArrayBuffer[Int]) {
-      if (!list.contains(index)) {
-        list += index
-      }
-    }
-
     for (loc <- tasks(index).preferredLocations) {
       loc match {
         case e: ExecutorCacheTaskLocation =>
-          addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))
+          pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
         case e: HDFSCacheTaskLocation => {
           val exe = sched.getExecutorsAliveOnHost(loc.host)
           exe match {
             case Some(set) => {
               for (e <- set) {
-                addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))
+                pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
               }
               logInfo(s"Pending task $index has a cached location at ${e.host} " +
                 ", where there are executors " + set.mkString(","))
@@ -206,14 +204,14 @@ private[spark] class TaskSetManager(
         }
         case _ => Unit
       }
-      addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
+      pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
       for (rack <- sched.getRackForHost(loc.host)) {
-        addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
+        pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
       }
     }
 
     if (tasks(index).preferredLocations == Nil) {
-      addTo(pendingTasksWithNoPrefs)
+      pendingTasksWithNoPrefs += index
     }
 
     allPendingTasks += index  // No point scanning this whole list to find the old task there