diff --git a/src/scala/spark/SimpleJob.scala b/src/scala/spark/SimpleJob.scala index b15d0522d4083fe8063bcda66716808748f7bbd0..09846ccc34a1f26000b93ea1fc2e7f0034be2a21 100644 --- a/src/scala/spark/SimpleJob.scala +++ b/src/scala/spark/SimpleJob.scala @@ -51,6 +51,9 @@ extends Job(jobId) with Logging // the one that it was launched from, but gets removed from them later. val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] + // List containing pending tasks with no locality preferences + val pendingTasksWithNoPrefs = new ArrayBuffer[Int] + // List containing all pending tasks (also used as a stack, as above) val allPendingTasks = new ArrayBuffer[Int] @@ -66,11 +69,16 @@ extends Job(jobId) with Logging // Add a task to all the pending-task lists that it should be on. def addPendingTask(index: Int) { - allPendingTasks += index - for (host <- tasks(index).preferredLocations) { - val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer()) - list += index + val locations = tasks(index).preferredLocations + if (locations.size == 0) { + pendingTasksWithNoPrefs += index + } else { + for (host <- locations) { + val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer()) + list += index + } } + allPendingTasks += index } // Mark the job as finished and wake up any threads waiting on it @@ -103,6 +111,8 @@ extends Job(jobId) with Logging // Dequeue a pending task from the given list and return its index. // Return None if the list is empty. + // This method also cleans up any tasks in the list that have already + // been launched, since we want that to happen lazily. def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = { while (!list.isEmpty) { val index = list.last @@ -117,11 +127,18 @@ extends Job(jobId) with Logging // Dequeue a pending task for a given node and return its index. // If localOnly is set to false, allow non-local tasks as well. def findTask(host: String, localOnly: Boolean): Option[Int] = { - findTaskFromList(getPendingTasksForHost(host)) match { - case Some(task) => Some(task) - case None => - if (localOnly) None - else findTaskFromList(allPendingTasks) + val localTask = findTaskFromList(getPendingTasksForHost(host)) + if (localTask != None) { + return localTask + } + val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs) + if (noPrefTask != None) { + return noPrefTask + } + if (!localOnly) { + return findTaskFromList(allPendingTasks) // Look for non-local task + } else { + return None } }