From aef9e5b98c0ebd23fb9a85830ba0ea993f826f82 Mon Sep 17 00:00:00 2001 From: Matei Zaharia <matei@eecs.berkeley.edu> Date: Sun, 3 Oct 2010 13:28:01 -0700 Subject: [PATCH] Renamed ParallelOperation to Job --- src/scala/spark/MesosScheduler.scala | 84 ++++++++++++++-------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala index 873a97c59c..ecb95d7c0b 100644 --- a/src/scala/spark/MesosScheduler.scala +++ b/src/scala/spark/MesosScheduler.scala @@ -18,9 +18,9 @@ import mesos._ // 1) Right now, the scheduler uses a linear scan through the tasks to find a // local one for a given node. It would be faster to have a separate list of // pending tasks for each node. -// 2) Presenting a single slave in ParallelOperation.slaveOffer makes it +// 2) Presenting a single slave in Job.slaveOffer makes it // difficult to balance tasks across nodes. It would be better to pass -// all the offers to the ParallelOperation and have it load-balance. +// all the offers to the Job and have it load-balance. private class MesosScheduler( master: String, frameworkName: String, execArg: Array[Byte]) extends NScheduler with spark.Scheduler with Logging @@ -33,14 +33,14 @@ extends NScheduler with spark.Scheduler with Logging val registeredLock = new Object() // Current callback object (may be null) - var activeOpsQueue = new Queue[Int] - var activeOps = new HashMap[Int, ParallelOperation] - private var nextOpId = 0 - private[spark] var taskIdToOpId = new HashMap[Int, Int] + var activeJobsQueue = new Queue[Int] + var activeJobs = new HashMap[Int, Job] + private var nextJobId = 0 + private[spark] var taskIdToJobId = new HashMap[Int, Int] - def newOpId(): Int = { - val id = nextOpId - nextOpId += 1 + def newJobId(): Int = { + val id = nextJobId + nextJobId += 1 return id } @@ -73,31 +73,31 @@ extends NScheduler with spark.Scheduler with Logging new ExecutorInfo(new File("spark-executor").getCanonicalPath(), execArg) override def runTasks[T: ClassManifest](tasks: Array[Task[T]]): Array[T] = { - var opId = 0 + var jobId = 0 waitForRegister() this.synchronized { - opId = newOpId() + jobId = newJobId() } - val myOp = new SimpleParallelOperation(this, tasks, opId) + val myJob = new SimpleJob(this, tasks, jobId) try { this.synchronized { - this.activeOps(myOp.opId) = myOp - this.activeOpsQueue += myOp.opId + this.activeJobs(myJob.jobId) = myJob + this.activeJobsQueue += myJob.jobId } driver.reviveOffers(); - myOp.join(); + myJob.join(); } finally { this.synchronized { - this.activeOps.remove(myOp.opId) - this.activeOpsQueue.dequeueAll(x => (x == myOp.opId)) + this.activeJobs.remove(myJob.jobId) + this.activeJobsQueue.dequeueAll(x => (x == myJob.jobId)) } } - if (myOp.errorHappened) - throw new SparkException(myOp.errorMessage, myOp.errorCode) + if (myJob.errorHappened) + throw new SparkException(myJob.errorMessage, myJob.errorCode) else - return myOp.results + return myJob.results } override def registered(d: SchedulerDriver, frameworkId: String) { @@ -122,13 +122,13 @@ extends NScheduler with spark.Scheduler with Logging val availableCpus = offers.map(_.getParams.get("cpus").toInt) val availableMem = offers.map(_.getParams.get("mem").toInt) var launchedTask = true - for (opId <- activeOpsQueue) { + for (jobId <- activeJobsQueue) { launchedTask = true while (launchedTask) { launchedTask = false for (i <- 0 until offers.size.toInt) { try { - activeOps(opId).slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match { + activeJobs(jobId).slaveOffer(offers.get(i), availableCpus(i), availableMem(i)) match { case Some(task) => tasks.add(task) availableCpus(i) -= task.getParams.get("cpus").toInt @@ -151,10 +151,10 @@ extends NScheduler with spark.Scheduler with Logging override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { synchronized { try { - taskIdToOpId.get(status.getTaskId) match { - case Some(opId) => - if (activeOps.contains(opId)) { - activeOps(opId).statusUpdate(status) + taskIdToJobId.get(status.getTaskId) match { + case Some(jobId) => + if (activeJobs.contains(jobId)) { + activeJobs(jobId).statusUpdate(status) } case None => logInfo("TID " + status.getTaskId + " already finished") @@ -168,10 +168,10 @@ extends NScheduler with spark.Scheduler with Logging override def error(d: SchedulerDriver, code: Int, message: String) { synchronized { - if (activeOps.size > 0) { - for ((opId, activeOp) <- activeOps) { + if (activeJobs.size > 0) { + for ((jobId, activeJob) <- activeJobs) { try { - activeOp.error(code, message) + activeJob.error(code, message) } catch { case e: Exception => logError("Exception in error callback", e) } @@ -195,16 +195,16 @@ extends NScheduler with spark.Scheduler with Logging // Trait representing an object that manages a parallel operation by // implementing various scheduler callbacks. -trait ParallelOperation { +trait Job { def slaveOffer(s: SlaveOffer, availableCpus: Int, availableMem: Int): Option[TaskDescription] def statusUpdate(t: TaskStatus): Unit def error(code: Int, message: String): Unit } -class SimpleParallelOperation[T: ClassManifest]( - sched: MesosScheduler, tasks: Array[Task[T]], val opId: Int) -extends ParallelOperation with Logging +class SimpleJob[T: ClassManifest]( + sched: MesosScheduler, tasks: Array[Task[T]], val jobId: Int) +extends Job with Logging { // Maximum time to wait to run a task in a preferred location (in ms) val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong @@ -258,12 +258,12 @@ extends ParallelOperation with Logging tasks(i).preferredLocations.isEmpty)) { val taskId = sched.newTaskId() - sched.taskIdToOpId(taskId) = opId + sched.taskIdToJobId(taskId) = jobId tidToIndex(taskId) = i val preferred = if(checkPref) "preferred" else "non-preferred" val message = - "Starting task %d as opId %d, TID %s on slave %s: %s (%s)".format( - i, opId, taskId, offer.getSlaveId, offer.getHost, preferred) + "Starting task %d as jobId %d, TID %s on slave %s: %s (%s)".format( + i, jobId, taskId, offer.getSlaveId, offer.getHost, preferred) logInfo(message) tasks(i).markStarted(offer) launched(i) = true @@ -302,8 +302,8 @@ extends ParallelOperation with Logging val index = tidToIndex(tid) if (!finished(index)) { tasksFinished += 1 - logInfo("Finished opId %d TID %d (progress: %d/%d)".format( - opId, tid, tasksFinished, numTasks)) + logInfo("Finished job %d TID %d (progress: %d/%d)".format( + jobId, tid, tasksFinished, numTasks)) // Deserialize task result val result = Utils.deserialize[TaskResult[T]](status.getData) results(index) = result.value @@ -311,8 +311,8 @@ extends ParallelOperation with Logging Accumulators.add(callingThread, result.accumUpdates) // Mark finished and stop if we've finished all the tasks finished(index) = true - // Remove TID -> opId mapping from sched - sched.taskIdToOpId.remove(tid) + // Remove TID -> jobId mapping from sched + sched.taskIdToJobId.remove(tid) if (tasksFinished == numTasks) setAllFinished() } else { @@ -325,9 +325,9 @@ extends ParallelOperation with Logging val tid = status.getTaskId val index = tidToIndex(tid) if (!finished(index)) { - logInfo("Lost opId " + opId + " TID " + tid) + logInfo("Lost job " + jobId + " TID " + tid) launched(index) = false - sched.taskIdToOpId.remove(tid) + sched.taskIdToJobId.remove(tid) tasksLaunched -= 1 } else { logInfo("Ignoring task-lost event for TID " + tid + -- GitLab