diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4b19beb43fd6b6d8ae37509caf173fd9cb79374a..cf97877476d54e630dbf146bff825325d79148a2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -114,14 +114,9 @@ private[spark] class TaskSetManager( // treated as stacks, in which new tasks are added to the end of the // ArrayBuffer and removed from the end. This makes it faster to detect // tasks that repeatedly fail because whenever a task failed, it is put - // back at the head of the stack. These collections may contain duplicates - // for two reasons: - // (1): Tasks are only removed lazily; when a task is launched, it remains - // in all the pending lists except the one that it was launched from. - // (2): Tasks may be re-added to these lists multiple times as a result - // of failures. - // Duplicates are handled in dequeueTaskFromList, which ensures that a - // task hasn't already started running before launching it. + // back at the head of the stack. They are also only cleaned up lazily; + // when a task is launched, it remains in all the pending lists except + // the one that it was launched from, but gets removed from them later. private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]] // Set of pending tasks for each host. Similar to pendingTasksForExecutor, @@ -186,7 +181,9 @@ private[spark] class TaskSetManager( private def addPendingTask(index: Int) { // Utility method that adds `index` to a list only if it's not already there def addTo(list: ArrayBuffer[Int]) { - list += index + if (!list.contains(index)) { + list += index + } } for (loc <- tasks(index).preferredLocations) {