Skip to content
Snippets Groups Projects
Commit aef9e5b9 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Renamed ParallelOperation to Job

parent b6debf5d
No related branches found
No related tags found
No related merge requests found
......@@ -18,9 +18,9 @@ import mesos._
// 1) Right now, the scheduler uses a linear scan through the tasks to find a
// local one for a given node. It would be faster to have a separate list of
// pending tasks for each node.
// 2) Presenting a single slave in ParallelOperation.slaveOffer makes it
// 2) Presenting a single slave in Job.slaveOffer makes it
// difficult to balance tasks across nodes. It would be better to pass
// all the offers to the ParallelOperation and have it load-balance.
// all the offers to the Job and have it load-balance.
private class MesosScheduler(
master: String, frameworkName: String, execArg: Array[Byte])
extends NScheduler with spark.Scheduler with Logging
......@@ -33,14 +33,14 @@ extends NScheduler with spark.Scheduler with Logging
val registeredLock = new Object()
// Current callback object (may be null)
var activeOpsQueue = new Queue[Int]
var activeOps = new HashMap[Int, ParallelOperation]
private var nextOpId = 0
private[spark] var taskIdToOpId = new HashMap[Int, Int]
var activeJobsQueue = new Queue[Int]
var activeJobs = new HashMap[Int, Job]
private var nextJobId = 0
private[spark] var taskIdToJobId = new HashMap[Int, Int]
def newOpId(): Int = {
val id = nextOpId
nextOpId += 1
def newJobId(): Int = {
val id = nextJobId
nextJobId += 1
return id
}
......@@ -73,31 +73,31 @@ extends NScheduler with spark.Scheduler with Logging
new ExecutorInfo(new File("spark-executor").getCanonicalPath(), execArg)
override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = {
var opId = 0
var jobId = 0
waitForRegister()
this.synchronized {
opId = newOpId()
jobId = newJobId()
}
val myOp = new SimpleParallelOperation(this, tasks, opId)
val myJob = new SimpleJob(this, tasks, jobId)
try {
this.synchronized {
this.activeOps(myOp.opId) = myOp
this.activeOpsQueue += myOp.opId
this.activeJobs(myJob.jobId) = myJob
this.activeJobsQueue += myJob.jobId
}
driver.reviveOffers();
myOp.join();
myJob.join();
} finally {
this.synchronized {
this.activeOps.remove(myOp.opId)
this.activeOpsQueue.dequeueAll(x => (x == myOp.opId))
this.activeJobs.remove(myJob.jobId)
this.activeJobsQueue.dequeueAll(x => (x == myJob.jobId))
}
}
if (myOp.errorHappened)
throw new SparkException(myOp.errorMessage, myOp.errorCode)
if (myJob.errorHappened)
throw new SparkException(myJob.errorMessage, myJob.errorCode)
else
return myOp.results
return myJob.results
}
override def registered(d: SchedulerDriver, frameworkId: String) {
......@@ -122,13 +122,13 @@ extends NScheduler with spark.Scheduler with Logging
val availableCpus = offers.map(_.getParams.get("cpus").toInt)
val availableMem = offers.map(_.getParams.get("mem").toInt)
var launchedTask = true
for (opId <- activeOpsQueue) {
for (jobId <- activeJobsQueue) {
launchedTask = true
while (launchedTask) {
launchedTask = false
for (i <- 0 until offers.size.toInt) {
try {
activeOps(opId).slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match {
activeJobs(jobId).slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match {
case Some(task) =>
tasks.add(task)
availableCpus(i) -= task.getParams.get("cpus").toInt
......@@ -151,10 +151,10 @@ extends NScheduler with spark.Scheduler with Logging
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
synchronized {
try {
taskIdToOpId.get(status.getTaskId) match {
case Some(opId) =>
if (activeOps.contains(opId)) {
activeOps(opId).statusUpdate(status)
taskIdToJobId.get(status.getTaskId) match {
case Some(jobId) =>
if (activeJobs.contains(jobId)) {
activeJobs(jobId).statusUpdate(status)
}
case None =>
logInfo("TID " + status.getTaskId + " already finished")
......@@ -168,10 +168,10 @@ extends NScheduler with spark.Scheduler with Logging
override def error(d: SchedulerDriver, code: Int, message: String) {
synchronized {
if (activeOps.size > 0) {
for ((opId, activeOp) <- activeOps) {
if (activeJobs.size > 0) {
for ((jobId, activeJob) <- activeJobs) {
try {
activeOp.error(code, message)
activeJob.error(code, message)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
......@@ -195,16 +195,16 @@ extends NScheduler with spark.Scheduler with Logging
// Trait representing an object that manages a parallel operation by
// implementing various scheduler callbacks.
trait ParallelOperation {
trait Job {
def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int): Option[TaskDescription]
def statusUpdate(t: TaskStatus): Unit
def error(code: Int, message: String): Unit
}
class SimpleParallelOperation[T: ClassManifest](
sched: MesosScheduler, tasks: Array[Task[T]], val opId: Int)
extends ParallelOperation with Logging
class SimpleJob[T: ClassManifest](
sched: MesosScheduler, tasks: Array[Task[T]], val jobId: Int)
extends Job with Logging
{
// Maximum time to wait to run a task in a preferred location (in ms)
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
......@@ -258,12 +258,12 @@ extends ParallelOperation with Logging
tasks(i).preferredLocations.isEmpty))
{
val taskId = sched.newTaskId()
sched.taskIdToOpId(taskId) = opId
sched.taskIdToJobId(taskId) = jobId
tidToIndex(taskId) = i
val preferred = if(checkPref) "preferred" else "non-preferred"
val message =
"Starting task %d as opId %d, TID %s on slave %s: %s (%s)".format(
i, opId, taskId, offer.getSlaveId, offer.getHost, preferred)
"Starting task %d as jobId %d, TID %s on slave %s: %s (%s)".format(
i, jobId, taskId, offer.getSlaveId, offer.getHost, preferred)
logInfo(message)
tasks(i).markStarted(offer)
launched(i) = true
......@@ -302,8 +302,8 @@ extends ParallelOperation with Logging
val index = tidToIndex(tid)
if (!finished(index)) {
tasksFinished += 1
logInfo("Finished opId %d TID %d (progress: %d/%d)".format(
opId, tid, tasksFinished, numTasks))
logInfo("Finished job %d TID %d (progress: %d/%d)".format(
jobId, tid, tasksFinished, numTasks))
// Deserialize task result
val result = Utils.deserialize[TaskResult[T]](status.getData)
results(index) = result.value
......@@ -311,8 +311,8 @@ extends ParallelOperation with Logging
Accumulators.add(callingThread, result.accumUpdates)
// Mark finished and stop if we've finished all the tasks
finished(index) = true
// Remove TID -> opId mapping from sched
sched.taskIdToOpId.remove(tid)
// Remove TID -> jobId mapping from sched
sched.taskIdToJobId.remove(tid)
if (tasksFinished == numTasks)
setAllFinished()
} else {
......@@ -325,9 +325,9 @@ extends ParallelOperation with Logging
val tid = status.getTaskId
val index = tidToIndex(tid)
if (!finished(index)) {
logInfo("Lost opId " + opId + " TID " + tid)
logInfo("Lost job " + jobId + " TID " + tid)
launched(index) = false
sched.taskIdToOpId.remove(tid)
sched.taskIdToJobId.remove(tid)
tasksLaunched -= 1
} else {
logInfo("Ignoring task-lost event for TID " + tid +
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment