From bf21bb28f3f1fef1445f6413bb13d74888842906 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Sat, 16 Oct 2010 11:57:36 -0700
Subject: [PATCH] Further clarified some code

---
 src/scala/spark/MesosScheduler.scala | 27 +++++++++++++++++++++------
 src/scala/spark/SimpleJob.scala      |  5 +----
 2 files changed, 22 insertions(+), 10 deletions(-)

diff --git a/src/scala/spark/MesosScheduler.scala b/src/scala/spark/MesosScheduler.scala
index 8a713d6f2b..5adff032eb 100644
--- a/src/scala/spark/MesosScheduler.scala
+++ b/src/scala/spark/MesosScheduler.scala
@@ -29,13 +29,13 @@ extends MScheduler with spark.Scheduler with Logging
   )
 
   // Lock used to wait for  scheduler to be registered
-  var isRegistered = false
-  val registeredLock = new Object()
+  private var isRegistered = false
+  private val registeredLock = new Object()
 
-  var activeJobs = new HashMap[Int, Job]
-  var activeJobsQueue = new Queue[Job]
+  private var activeJobs = new HashMap[Int, Job]
+  private var activeJobsQueue = new Queue[Job]
 
-  private[spark] var taskIdToJobId = new HashMap[Int, Int]
+  private var taskIdToJobId = new HashMap[Int, Int]
 
   private var nextJobId = 0
   
@@ -126,6 +126,11 @@ extends MScheduler with spark.Scheduler with Logging
     }
   }
 
+  /**
+   * Method called by Mesos to offer resources on slaves. We resond by asking
+   * our active jobs for tasks in FIFO order. We fill each node with tasks in
+   * a round-robin manner so that tasks are balanced across the cluster.
+   */
   override def resourceOffer(
       d: SchedulerDriver, oid: String, offers: JList[SlaveOffer]) {
     synchronized {
@@ -159,6 +164,14 @@ extends MScheduler with spark.Scheduler with Logging
     }
   }
 
+  // Check whether a Mesos task state represents a finished task
+  def isFinished(state: TaskState) = {
+    state == TaskState.TASK_FINISHED ||
+    state == TaskState.TASK_FAILED ||
+    state == TaskState.TASK_KILLED ||
+    state == TaskState.TASK_LOST
+  }
+
   override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
     synchronized {
       try {
@@ -167,10 +180,12 @@ extends MScheduler with spark.Scheduler with Logging
             if (activeJobs.contains(jobId)) {
               activeJobs(jobId).statusUpdate(status)
             }
+            if (isFinished(status.getState)) {
+              taskIdToJobId.remove(status.getTaskId)
+            }
           case None =>
             logInfo("TID " + status.getTaskId + " already finished")
         }
-
       } catch {
         case e: Exception => logError("Exception in statusUpdate", e)
       }
diff --git a/src/scala/spark/SimpleJob.scala b/src/scala/spark/SimpleJob.scala
index faf3b1c492..b15d0522d4 100644
--- a/src/scala/spark/SimpleJob.scala
+++ b/src/scala/spark/SimpleJob.scala
@@ -9,7 +9,7 @@ import mesos._
 
 
 /**
- * A simple implementation of Job that just runs each task in an array.
+ * A Job that runs a set of tasks with no interdependencies.
  */
 class SimpleJob[T: ClassManifest](
   sched: MesosScheduler, tasks: Array[Task[T]], val jobId: Int)
@@ -204,8 +204,6 @@ extends Job(jobId) with Logging
       Accumulators.add(callingThread, result.accumUpdates)
       // Mark finished and stop if we've finished all the tasks
       finished(index) = true
-      // Remove TID -> jobId mapping from sched
-      sched.taskIdToJobId.remove(tid)
       if (tasksFinished == numTasks)
         setAllFinished()
     } else {
@@ -220,7 +218,6 @@ extends Job(jobId) with Logging
     if (!finished(index)) {
       logInfo("Lost TID %d (task %d:%d)".format(tid, jobId, index))
       launched(index) = false
-      sched.taskIdToJobId.remove(tid)
       tasksLaunched -= 1
       // Re-enqueue the task as pending
       addPendingTask(index)
-- 
GitLab