diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala index 8a713d6f2be79dd82c855232a76c41238de20577..5adff032eb1f5f8bba719fd9a568a4e4f946d164 100644 --- a/src/scala/spark/MesosScheduler.scala +++ b/src/scala/spark/MesosScheduler.scala @@ -29,13 +29,13 @@ extends MScheduler with spark.Scheduler with Logging ) // Lock used to wait for scheduler to be registered - var isRegistered = false - val registeredLock = new Object() + private var isRegistered = false + private val registeredLock = new Object() - var activeJobs = new HashMap[Int, Job] - var activeJobsQueue = new Queue[Job] + private var activeJobs = new HashMap[Int, Job] + private var activeJobsQueue = new Queue[Job] - private[spark] var taskIdToJobId = new HashMap[Int, Int] + private var taskIdToJobId = new HashMap[Int, Int] private var nextJobId = 0 @@ -126,6 +126,11 @@ extends MScheduler with spark.Scheduler with Logging } } + /** + * Method called by Mesos to offer resources on slaves. We resond by asking + * our active jobs for tasks in FIFO order. We fill each node with tasks in + * a round-robin manner so that tasks are balanced across the cluster. + */ override def resourceOffer( d: SchedulerDriver, oid: String, offers: JList[SlaveOffer]) { synchronized { @@ -159,6 +164,14 @@ extends MScheduler with spark.Scheduler with Logging } } + // Check whether a Mesos task state represents a finished task + def isFinished(state: TaskState) = { + state == TaskState.TASK_FINISHED || + state == TaskState.TASK_FAILED || + state == TaskState.TASK_KILLED || + state == TaskState.TASK_LOST + } + override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { synchronized { try { @@ -167,10 +180,12 @@ extends MScheduler with spark.Scheduler with Logging if (activeJobs.contains(jobId)) { activeJobs(jobId).statusUpdate(status) } + if (isFinished(status.getState)) { + taskIdToJobId.remove(status.getTaskId) + } case None => logInfo("TID " + status.getTaskId + " already finished") } - } catch { case e: Exception => logError("Exception in statusUpdate", e) } diff --git a/src/scala/spark/SimpleJob.scala b/src/scala/spark/SimpleJob.scala index faf3b1c49231dfec0179d99621a00678328e5ce0..b15d0522d4083fe8063bcda66716808748f7bbd0 100644 --- a/src/scala/spark/SimpleJob.scala +++ b/src/scala/spark/SimpleJob.scala @@ -9,7 +9,7 @@ import mesos._ /** - * A simple implementation of Job that just runs each task in an array. + * A Job that runs a set of tasks with no interdependencies. */ class SimpleJob[T: ClassManifest]( sched: MesosScheduler, tasks: Array[Task[T]], val jobId: Int) @@ -204,8 +204,6 @@ extends Job(jobId) with Logging Accumulators.add(callingThread, result.accumUpdates) // Mark finished and stop if we've finished all the tasks finished(index) = true - // Remove TID -> jobId mapping from sched - sched.taskIdToJobId.remove(tid) if (tasksFinished == numTasks) setAllFinished() } else { @@ -220,7 +218,6 @@ extends Job(jobId) with Logging if (!finished(index)) { logInfo("Lost TID %d (task %d:%d)".format(tid, jobId, index)) launched(index) = false - sched.taskIdToJobId.remove(tid) tasksLaunched -= 1 // Re-enqueue the task as pending addPendingTask(index)