Skip to content
Snippets Groups Projects
Commit bf21bb28 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Further clarified some code

parent c21f840a
No related branches found
No related tags found
No related merge requests found
......@@ -29,13 +29,13 @@ extends MScheduler with spark.Scheduler with Logging
)
// Lock used to wait for scheduler to be registered
var isRegistered = false
val registeredLock = new Object()
private var isRegistered = false
private val registeredLock = new Object()
var activeJobs = new HashMap[Int, Job]
var activeJobsQueue = new Queue[Job]
private var activeJobs = new HashMap[Int, Job]
private var activeJobsQueue = new Queue[Job]
private[spark] var taskIdToJobId = new HashMap[Int, Int]
private var taskIdToJobId = new HashMap[Int, Int]
private var nextJobId = 0
......@@ -126,6 +126,11 @@ extends MScheduler with spark.Scheduler with Logging
}
}
/**
* Method called by Mesos to offer resources on slaves. We resond by asking
* our active jobs for tasks in FIFO order. We fill each node with tasks in
* a round-robin manner so that tasks are balanced across the cluster.
*/
override def resourceOffer(
d: SchedulerDriver, oid: String, offers: JList[SlaveOffer]) {
synchronized {
......@@ -159,6 +164,14 @@ extends MScheduler with spark.Scheduler with Logging
}
}
// Check whether a Mesos task state represents a finished task
def isFinished(state: TaskState) = {
state == TaskState.TASK_FINISHED ||
state == TaskState.TASK_FAILED ||
state == TaskState.TASK_KILLED ||
state == TaskState.TASK_LOST
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
synchronized {
try {
......@@ -167,10 +180,12 @@ extends MScheduler with spark.Scheduler with Logging
if (activeJobs.contains(jobId)) {
activeJobs(jobId).statusUpdate(status)
}
if (isFinished(status.getState)) {
taskIdToJobId.remove(status.getTaskId)
}
case None =>
logInfo("TID " + status.getTaskId + " already finished")
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
......
......@@ -9,7 +9,7 @@ import mesos._
/**
* A simple implementation of Job that just runs each task in an array.
* A Job that runs a set of tasks with no interdependencies.
*/
class SimpleJob[T: ClassManifest](
sched: MesosScheduler, tasks: Array[Task[T]], val jobId: Int)
......@@ -204,8 +204,6 @@ extends Job(jobId) with Logging
Accumulators.add(callingThread, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks
finished(index) = true
// Remove TID -> jobId mapping from sched
sched.taskIdToJobId.remove(tid)
if (tasksFinished == numTasks)
setAllFinished()
} else {
......@@ -220,7 +218,6 @@ extends Job(jobId) with Logging
if (!finished(index)) {
logInfo("Lost TID %d (task %d:%d)".format(tid, jobId, index))
launched(index) = false
sched.taskIdToJobId.remove(tid)
tasksLaunched -= 1
// Re-enqueue the task as pending
addPendingTask(index)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment