Skip to content
Snippets Groups Projects
Commit 787faf0d authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Fixed a bug with scheduling of tasks that have no locality preferences.

These tasks were being subjected to delay scheduling but then counted as
having been launched on a preferred node. The solution is to have a
separate queue for them and treat them as preferred during scheduling.
parent 0e0ec835
No related branches found
No related tags found
No related merge requests found
...@@ -51,6 +51,9 @@ extends Job(jobId) with Logging ...@@ -51,6 +51,9 @@ extends Job(jobId) with Logging
// the one that it was launched from, but gets removed from them later. // the one that it was launched from, but gets removed from them later.
val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
// List containing pending tasks with no locality preferences
val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
// List containing all pending tasks (also used as a stack, as above) // List containing all pending tasks (also used as a stack, as above)
val allPendingTasks = new ArrayBuffer[Int] val allPendingTasks = new ArrayBuffer[Int]
...@@ -66,11 +69,16 @@ extends Job(jobId) with Logging ...@@ -66,11 +69,16 @@ extends Job(jobId) with Logging
// Add a task to all the pending-task lists that it should be on. // Add a task to all the pending-task lists that it should be on.
def addPendingTask(index: Int) { def addPendingTask(index: Int) {
allPendingTasks += index val locations = tasks(index).preferredLocations
for (host <- tasks(index).preferredLocations) { if (locations.size == 0) {
val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer()) pendingTasksWithNoPrefs += index
list += index } else {
for (host <- locations) {
val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
list += index
}
} }
allPendingTasks += index
} }
// Mark the job as finished and wake up any threads waiting on it // Mark the job as finished and wake up any threads waiting on it
...@@ -103,6 +111,8 @@ extends Job(jobId) with Logging ...@@ -103,6 +111,8 @@ extends Job(jobId) with Logging
// Dequeue a pending task from the given list and return its index. // Dequeue a pending task from the given list and return its index.
// Return None if the list is empty. // Return None if the list is empty.
// This method also cleans up any tasks in the list that have already
// been launched, since we want that to happen lazily.
def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = { def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
while (!list.isEmpty) { while (!list.isEmpty) {
val index = list.last val index = list.last
...@@ -117,11 +127,18 @@ extends Job(jobId) with Logging ...@@ -117,11 +127,18 @@ extends Job(jobId) with Logging
// Dequeue a pending task for a given node and return its index. // Dequeue a pending task for a given node and return its index.
// If localOnly is set to false, allow non-local tasks as well. // If localOnly is set to false, allow non-local tasks as well.
def findTask(host: String, localOnly: Boolean): Option[Int] = { def findTask(host: String, localOnly: Boolean): Option[Int] = {
findTaskFromList(getPendingTasksForHost(host)) match { val localTask = findTaskFromList(getPendingTasksForHost(host))
case Some(task) => Some(task) if (localTask != None) {
case None => return localTask
if (localOnly) None }
else findTaskFromList(allPendingTasks) val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs)
if (noPrefTask != None) {
return noPrefTask
}
if (!localOnly) {
return findTaskFromList(allPendingTasks) // Look for non-local task
} else {
return None
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment