diff --git a/src/scala/spark/Job.scala b/src/scala/spark/Job.scala index 6b01307adcc5e06d0e65ffa25f461b0406270c86..6abbcbce5153adca58792f29c3e7cca8c6691903 100644 --- a/src/scala/spark/Job.scala +++ b/src/scala/spark/Job.scala @@ -3,14 +3,16 @@ package spark import mesos._ /** - * Trait representing a parallel job in MesosScheduler. Schedules the + * Class representing a parallel job in MesosScheduler. Schedules the * job by implementing various callbacks. */ -trait Job { +abstract class Job(jobId: Int) { def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int) : Option[TaskDescription] def statusUpdate(t: TaskStatus): Unit def error(code: Int, message: String): Unit + + def getId(): Int = jobId } diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala index 87ebcb86920ff99f460d3b13c8297dd1870152d9..8a713d6f2be79dd82c855232a76c41238de20577 100644 --- a/src/scala/spark/MesosScheduler.scala +++ b/src/scala/spark/MesosScheduler.scala @@ -141,6 +141,7 @@ extends MScheduler with spark.Scheduler with Logging job.slaveOffer(offers(i), availableCpus(i), availableMem(i)) match { case Some(task) => tasks.add(task) + taskIdToJobId(task.getTaskId) = job.getId availableCpus(i) -= task.getParams.get("cpus").toInt availableMem(i) -= task.getParams.get("mem").toInt launchedTask = true diff --git a/src/scala/spark/SimpleJob.scala b/src/scala/spark/SimpleJob.scala index a8544e4474bb8f4b868b729853afbf8525677135..d0abaa4b67aa1d0adaf28de04d1d8f13d0f11d8d 100644 --- a/src/scala/spark/SimpleJob.scala +++ b/src/scala/spark/SimpleJob.scala @@ -2,8 +2,8 @@ package spark import java.util.{HashMap => JHashMap} +import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue import mesos._ @@ -13,7 +13,7 @@ import mesos._ */ class SimpleJob[T: ClassManifest]( sched: MesosScheduler, tasks: Array[Task[T]], val jobId: Int) -extends Job with Logging +extends Job(jobId) with Logging { // Maximum time to wait to run a task in a preferred location (in ms) val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong @@ -42,24 +42,34 @@ extends Job with Logging // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis - // Queue of pending tasks for each node - val pendingTasksForNode = new HashMap[String, Queue[Int]] + // List of pending tasks for each node. These collections are actually + // treated as stacks, in which new tasks are added to the end of the + // ArrayBuffer and removed from the end. This makes it faster to detect + // tasks that repeatedly fail because whenever a task failed, it is put + // back at the head of the stack. They are also only cleaned up lazily; + // when a task is launched, it remains in all the pending lists except + // the one that it was launched from, but gets removed from them later. + val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]] - // Queue containing all pending tasks - val allPendingTasks = new Queue[Int] + // List containing all pending tasks (also used as a stack, as above) + val allPendingTasks = new ArrayBuffer[Int] // Did the job fail? var failed = false var causeOfFailure = "" - for (i <- 0 until numTasks) { + // Add all our tasks to the pending lists. We do this in reverse order + // of task index so that tasks with low indices get launched first. + for (i <- (0 until numTasks).reverse) { addPendingTask(i) } + // Add a task to all the pending-task lists that it should be on. def addPendingTask(index: Int) { allPendingTasks += index for (host <- tasks(index).preferredLocations) { - pendingTasksForNode(host) += index + val list = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer()) + list += index } } @@ -85,15 +95,18 @@ extends Job with Logging } } - def getPendingTasksForNode(host: String): Queue[Int] = { - pendingTasksForNode.getOrElse(host, Queue()) + // Return the pending tasks list for a given host, or an empty list if + // there is no map entry for that host + def getPendingTasksForHost(host: String): ArrayBuffer[Int] = { + pendingTasksForHost.getOrElse(host, ArrayBuffer()) } - // Dequeue a pending task from the given queue and return its index. - // Return None if the queue is empty. - def findTaskFromQueue(queue: Queue[Int]): Option[Int] = { - while (!queue.isEmpty) { - val index = queue.dequeue + // Dequeue a pending task from the given list and return its index. + // Return None if the list is empty. + def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = { + while (!list.isEmpty) { + val index = list.last + list.trimEnd(1) if (!launched(index) && !finished(index)) { return Some(index) } @@ -104,19 +117,23 @@ extends Job with Logging // Dequeue a pending task for a given node and return its index. // If localOnly is set to false, allow non-local tasks as well. def findTask(host: String, localOnly: Boolean): Option[Int] = { - findTaskFromQueue(getPendingTasksForNode(host)) match { + findTaskFromList(getPendingTasksForHost(host)) match { case Some(task) => Some(task) case None => if (localOnly) None - else findTaskFromQueue(allPendingTasks) + else findTaskFromList(allPendingTasks) } } + // Does a host count as a preferred location for a task? This is true if + // either the task has preferred locations and this host is one, or it has + // no preferred locations (in which we still count the launch as preferred). def isPreferredLocation(task: Task[T], host: String): Boolean = { val locs = task.preferredLocations return (locs.contains(host) || locs.isEmpty) } + // Respond to an offer of a single slave from the scheduler by finding a task def slaveOffer(offer: SlaveOffer, availableCpus: Int, availableMem: Int) : Option[TaskDescription] = { if (tasksLaunched < numTasks && availableCpus >= CPUS_PER_TASK && @@ -126,9 +143,10 @@ extends Job with Logging val host = offer.getHost findTask(host, localOnly) match { case Some(index) => { + // Found a task; do some bookkeeping and return a Mesos task for it val task = tasks(index) val taskId = sched.newTaskId() - // Figure out whether the task's location is preferred + // Figure out whether this should count as a preferred launch val preferred = isPreferredLocation(task, host) val prefStr = if(preferred) "preferred" else "non-preferred" val message = @@ -136,7 +154,6 @@ extends Job with Logging index, jobId, taskId, offer.getSlaveId, host, prefStr) logInfo(message) // Do various bookkeeping - sched.taskIdToJobId(taskId) = jobId tidToIndex(taskId) = index task.markStarted(offer) launched(index) = true @@ -145,12 +162,13 @@ extends Job with Logging lastPreferredLaunchTime = time // Create and return the Mesos task object val params = new JHashMap[String, String] - params.put("cpus", "" + CPUS_PER_TASK) - params.put("mem", "" + MEM_PER_TASK) + params.put("cpus", CPUS_PER_TASK.toString) + params.put("mem", MEM_PER_TASK.toString) val serializedTask = Utils.serialize(task) logDebug("Serialized size: " + serializedTask.size) - return Some(new TaskDescription(taskId, offer.getSlaveId, - "task_" + taskId, params, serializedTask)) + val taskName = "task %d:%d".format(jobId, taskId) + return Some(new TaskDescription( + taskId, offer.getSlaveId, taskName, params, serializedTask)) } case _ => }