From 1aa63f775b9ecfa5225449de5cac8427c0e90d54 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Sun, 8 Jul 2012 10:52:13 -0700
Subject: [PATCH] Added back coarse-grained Mesos scheduler based on
 StandaloneScheduler.

---
 core/src/main/scala/spark/SparkContext.scala  |  22 +-
 .../executor/StandaloneExecutorBackend.scala  |  14 +-
 .../scheduler/cluster/ClusterScheduler.scala  |   1 -
 .../cluster/StandaloneClusterMessage.scala    |   2 +-
 .../cluster/StandaloneSchedulerBackend.scala  |  52 ++-
 .../scheduler/cluster/TaskDescription.scala   |   6 +-
 .../scheduler/cluster/TaskSetManager.scala    |   5 +-
 .../mesos/CoarseMesosScheduler.scala          | 371 ------------------
 .../mesos/CoarseMesosSchedulerBackend.scala   | 251 ++++++++++++
 .../mesos/MesosSchedulerBackend.scala         |  10 +-
 .../scala/spark/util/SerializableBuffer.scala |   2 +-
 11 files changed, 320 insertions(+), 416 deletions(-)
 delete mode 100644 core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
 create mode 100644 core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 8a06642426..22e1d52f65 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
 import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
 
-import org.apache.mesos.MesosNativeLibrary
+import org.apache.mesos.{Scheduler, MesosNativeLibrary}
 
 import spark.broadcast._
 
@@ -41,8 +41,8 @@ import spark.scheduler.ShuffleMapTask
 import spark.scheduler.DAGScheduler
 import spark.scheduler.TaskScheduler
 import spark.scheduler.local.LocalScheduler
-import spark.scheduler.cluster.ClusterScheduler
-import spark.scheduler.mesos.MesosSchedulerBackend
+import spark.scheduler.cluster.{SchedulerBackend, ClusterScheduler}
+import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
 import spark.storage.BlockManagerMaster
 
 class SparkContext(
@@ -89,17 +89,15 @@ class SparkContext(
         new LocalScheduler(threads.toInt, maxFailures.toInt)
       case _ =>
         MesosNativeLibrary.load()
-        val sched = new ClusterScheduler(this)
-        val schedContext = new MesosSchedulerBackend(sched, this, master, frameworkName)
-        sched.initialize(schedContext)
-        sched
-        /*
-        if (System.getProperty("spark.mesos.coarse", "false") == "true") {
-          new CoarseMesosScheduler(this, master, frameworkName)
+        val scheduler = new ClusterScheduler(this)
+        val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
+        val backend = if (coarseGrained) {
+          new CoarseMesosSchedulerBackend(scheduler, this, master, frameworkName)
         } else {
-          new MesosSchedulerBackend(this, master, frameworkName)
+          new MesosSchedulerBackend(scheduler, this, master, frameworkName)
         }
-        */
+        scheduler.initialize(backend)
+        scheduler
     }
   }
   taskScheduler.start()
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index b717ed2b77..26b163de0a 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -7,11 +7,11 @@ import spark.util.AkkaUtils
 import akka.actor.{ActorRef, Actor, Props}
 import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
 import akka.remote.RemoteClientLifeCycleEvent
-import spark.scheduler.standalone._
-import spark.scheduler.standalone.RegisteredSlave
-import spark.scheduler.standalone.LaunchTask
-import spark.scheduler.standalone.RegisterSlaveFailed
-import spark.scheduler.standalone.RegisterSlave
+import spark.scheduler.cluster._
+import spark.scheduler.cluster.RegisteredSlave
+import spark.scheduler.cluster.LaunchTask
+import spark.scheduler.cluster.RegisterSlaveFailed
+import spark.scheduler.cluster.RegisterSlave
 
 
 class StandaloneExecutorBackend(
@@ -31,6 +31,7 @@ class StandaloneExecutorBackend(
 
   override def preStart() {
     try {
+      logInfo("Connecting to master: " + masterUrl)
       master = context.actorFor(masterUrl)
       master ! RegisterSlave(slaveId, hostname, cores)
       context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
@@ -51,7 +52,8 @@ class StandaloneExecutorBackend(
       logError("Slave registration failed: " + message)
       System.exit(1)
 
-    case LaunchTask(slaveId_, taskDesc) =>
+    case LaunchTask(taskDesc) =>
+      logInfo("Got assigned task " + taskDesc.taskId)
       executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
   }
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 7f1664b483..5b59479682 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -108,7 +108,6 @@ class ClusterScheduler(sc: SparkContext)
     }
   }
 
-
   /**
    * Called by cluster manager to offer resources on slaves. We respond by asking our active task
    * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index 7f19fe0cc5..80e8733671 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -7,7 +7,7 @@ import spark.util.SerializableBuffer
 sealed trait StandaloneClusterMessage extends Serializable
 
 // Master to slaves
-case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage
+case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
 case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
 case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
 
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 1acf9e86de..c3132abd7a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -1,6 +1,6 @@
 package spark.scheduler.cluster
 
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
 import akka.actor.{Props, Actor, ActorRef, ActorSystem}
 import akka.util.duration._
@@ -21,19 +21,24 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
   // Use an atomic variable to track total number of cores in the cluster for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
 
-  class MasterActor extends Actor {
+  class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor {
     val slaveActor = new HashMap[String, ActorRef]
     val slaveHost = new HashMap[String, String]
     val freeCores = new HashMap[String, Int]
 
     def receive = {
       case RegisterSlave(slaveId, host, cores) =>
-        slaveActor(slaveId) = sender
-        logInfo("Registered slave: " + sender + " with ID " + slaveId)
-        slaveHost(slaveId) = host
-        freeCores(slaveId) = cores
-        totalCoreCount.addAndGet(cores)
-        makeOffers()
+        if (slaveActor.contains(slaveId)) {
+          sender ! RegisterSlaveFailed("Duplicate slave ID: " + slaveId)
+        } else {
+          logInfo("Registered slave: " + sender + " with ID " + slaveId)
+          sender ! RegisteredSlave(sparkProperties)
+          slaveActor(slaveId) = sender
+          slaveHost(slaveId) = host
+          freeCores(slaveId) = cores
+          totalCoreCount.addAndGet(cores)
+          makeOffers()
+        }
 
       case StatusUpdate(slaveId, taskId, state, data) =>
         scheduler.statusUpdate(taskId, state, data.value)
@@ -42,10 +47,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
           makeOffers(slaveId)
         }
 
-      case LaunchTask(slaveId, task) =>
-        freeCores(slaveId) -= 1
-        slaveActor(slaveId) ! LaunchTask(slaveId, task)
-
       case ReviveOffers =>
         makeOffers()
 
@@ -58,14 +59,22 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
 
     // Make fake resource offers on all slaves
     def makeOffers() {
-      scheduler.resourceOffers(
-        slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})
+      launchTasks(scheduler.resourceOffers(
+        slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
     }
 
     // Make fake resource offers on just one slave
     def makeOffers(slaveId: String) {
-      scheduler.resourceOffers(
-        Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))
+      launchTasks(scheduler.resourceOffers(
+        Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId)))))
+    }
+
+    // Launch tasks returned by a set of resource offers
+    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
+      for (task <- tasks.flatten) {
+        freeCores(task.slaveId) -= 1
+        slaveActor(task.slaveId) ! LaunchTask(task)
+      }
     }
   }
 
@@ -73,8 +82,17 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
   val taskIdsOnSlave = new HashMap[String, HashSet[String]]
 
   def start() {
+    val properties = new ArrayBuffer[(String, String)]
+    val iterator = System.getProperties.entrySet.iterator
+    while (iterator.hasNext) {
+      val entry = iterator.next
+      val (key, value) = (entry.getKey.toString, entry.getValue.toString)
+      if (key.startsWith("spark.")) {
+        properties += ((key, value))
+      }
+    }
     masterActor = actorSystem.actorOf(
-      Props(new MasterActor), name = StandaloneSchedulerBackend.ACTOR_NAME)
+      Props(new MasterActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME)
   }
 
   def stop() {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index b0b3cbe7d5..f9a1b74fa5 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -3,7 +3,11 @@ package spark.scheduler.cluster
 import java.nio.ByteBuffer
 import spark.util.SerializableBuffer
 
-class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer)
+class TaskDescription(
+    val taskId: Long,
+    val slaveId: String,
+    val name: String,
+    _serializedTask: ByteBuffer)
   extends Serializable {
 
   // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index ab07f1c8c2..be24316e80 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -191,7 +191,7 @@ class TaskSetManager(
   def slaveOffer(slaveId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
     if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
       val time = System.currentTimeMillis
-      var localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
+      val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
 
       findTask(host, localOnly) match {
         case Some(index) => {
@@ -218,7 +218,7 @@ class TaskSetManager(
           logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
             taskSet.id, index, serializedTask.limit, timeTaken))
           val taskName = "task %s:%d".format(taskSet.id, index)
-          return Some(new TaskDescription(taskId, taskName, serializedTask))
+          return Some(new TaskDescription(taskId, slaveId, taskName, serializedTask))
         }
         case _ =>
       }
@@ -227,7 +227,6 @@ class TaskSetManager(
   }
 
   def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
-    logInfo("statusUpdate: " + tid + " is now " + state + " " + serializedData)
     state match {
       case TaskState.FINISHED =>
         taskFinished(tid, state, serializedData)
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
deleted file mode 100644
index 0a6e1350be..0000000000
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala
+++ /dev/null
@@ -1,371 +0,0 @@
-package spark.scheduler.mesos
-
-/*
-import java.io.{File, FileInputStream, FileOutputStream}
-import java.util.{ArrayList => JArrayList}
-import java.util.{List => JList}
-import java.util.{HashMap => JHashMap}
-import java.util.concurrent._
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.Map
-import scala.collection.mutable.PriorityQueue
-import scala.collection.JavaConversions._
-import scala.math.Ordering
-
-import akka.actor._
-import akka.dispatch._
-import akka.pattern.ask
-import akka.remote._
-import akka.util.Duration
-import akka.util.Timeout
-import akka.util.duration._
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MTaskInfo, TaskState => MesosTaskState, _}
-
-import spark._
-import spark.scheduler._
-import spark.scheduler.cluster.{TaskSetManager, ClusterScheduler}
-
-
-sealed trait CoarseMesosSchedulerMessage
-case class RegisterSlave(slaveId: String, host: String) extends CoarseMesosSchedulerMessage
-case class StatusUpdate(slaveId: String, status: TaskStatus) extends CoarseMesosSchedulerMessage
-case class LaunchTask(slaveId: String, task: MTaskInfo) extends CoarseMesosSchedulerMessage
-case class ReviveOffers() extends CoarseMesosSchedulerMessage
-
-case class FakeOffer(slaveId: String, host: String, cores: Int)
-
-/**
- * Mesos scheduler that uses coarse-grained tasks and does its own fine-grained scheduling inside
- * them using Akka actors for messaging. Clients should first call start(), then submit task sets
- * through the runTasks method.
- *
- * TODO: This is a pretty big hack for now.
- */
-class CoarseMesosScheduler(
-    sc: SparkContext,
-    master: String,
-    frameworkName: String)
-  extends ClusterScheduler(sc, master, frameworkName) {
-
-  val actorSystem = sc.env.actorSystem
-  val actorName = "CoarseMesosScheduler"
-  val coresPerSlave = System.getProperty("spark.coarseMesosScheduler.coresPerSlave", "4").toInt
-
-  class MasterActor extends Actor {
-    val slaveActor = new HashMap[String, ActorRef]
-    val slaveHost = new HashMap[String, String]
-    val freeCores = new HashMap[String, Int]
-   
-    def receive = {
-      case RegisterSlave(slaveId, host) =>
-        slaveActor(slaveId) = sender
-        logInfo("Registered slave: " + sender + " with ID " + slaveId)
-        slaveHost(slaveId) = host
-        freeCores(slaveId) = coresPerSlave
-        makeFakeOffers()
-
-      case StatusUpdate(slaveId, status) =>
-        fakeStatusUpdate(status)
-        if (isFinished(status.getState)) {
-          freeCores(slaveId) += 1
-          makeFakeOffers(slaveId)
-        }
-
-      case LaunchTask(slaveId, task) =>
-        freeCores(slaveId) -= 1
-        slaveActor(slaveId) ! LaunchTask(slaveId, task)
-
-      case ReviveOffers() =>
-        logInfo("Reviving offers")
-        makeFakeOffers()
-    }
-
-    // Make fake resource offers for all slaves
-    def makeFakeOffers() {
-      fakeResourceOffers(slaveHost.toSeq.map{case (id, host) => FakeOffer(id, host, freeCores(id))})
-    }
-
-    // Make fake resource offers for all slaves
-    def makeFakeOffers(slaveId: String) {
-      fakeResourceOffers(Seq(FakeOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))
-    }
-  }
-
-  val masterActor: ActorRef = actorSystem.actorOf(Props(new MasterActor), name = actorName)
-
-  val taskIdsOnSlave = new HashMap[String, HashSet[String]]
-
-  /**
-   * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets 
-   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
-   * tasks are balanced across the cluster.
-   */
-  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
-    synchronized {
-      val tasks = offers.map(o => new JArrayList[MTaskInfo])
-      for (i <- 0 until offers.size) {
-        val o = offers.get(i)
-        val slaveId = o.getSlaveId.getValue
-        if (!slaveIdToHost.contains(slaveId)) {
-          slaveIdToHost(slaveId) = o.getHostname
-          hostsAlive += o.getHostname
-          taskIdsOnSlave(slaveId) = new HashSet[String]
-          // Launch an infinite task on the node that will talk to the MasterActor to get fake tasks
-          val cpuRes = Resource.newBuilder()
-              .setName("cpus")
-              .setType(Value.Type.SCALAR)
-              .setScalar(Value.Scalar.newBuilder().setValue(1).build())
-              .build()
-          val task = new WorkerTask(slaveId, o.getHostname)
-          val serializedTask = Utils.serialize(task)
-          tasks(i).add(MTaskInfo.newBuilder()
-              .setTaskId(newTaskId())
-              .setSlaveId(o.getSlaveId)
-              .setExecutor(executorInfo)
-              .setName("worker task")
-              .addResources(cpuRes)
-              .setData(ByteString.copyFrom(serializedTask))
-              .build())
-        }
-      }
-      val filters = Filters.newBuilder().setRefuseSeconds(10).build()
-      for (i <- 0 until offers.size) {
-        d.launchTasks(offers(i).getId(), tasks(i), filters)
-      }
-    }
-  }
-
-  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
-    val tid = status.getTaskId.getValue
-    var taskSetToUpdate: Option[TaskSetManager] = None
-    var taskFailed = false
-    synchronized {
-      try {
-        taskIdToTaskSetId.get(tid) match {
-          case Some(taskSetId) =>
-            if (activeTaskSets.contains(taskSetId)) {
-              //activeTaskSets(taskSetId).statusUpdate(status)
-              taskSetToUpdate = Some(activeTaskSets(taskSetId))
-            }
-            if (isFinished(status.getState)) {
-              taskIdToTaskSetId.remove(tid)
-              if (taskSetTaskIds.contains(taskSetId)) {
-                taskSetTaskIds(taskSetId) -= tid
-              }
-              val slaveId = taskIdToSlaveId(tid)
-              taskIdToSlaveId -= tid
-              taskIdsOnSlave(slaveId) -= tid
-            }
-            if (status.getState == MesosTaskState.TASK_FAILED) {
-              taskFailed = true
-            }
-          case None =>
-            logInfo("Ignoring update from TID " + tid + " because its task set is gone")
-        }
-      } catch {
-        case e: Exception => logError("Exception in statusUpdate", e)
-      }
-    }
-    // Update the task set and DAGScheduler without holding a lock on this, because that can deadlock
-    if (taskSetToUpdate != None) {
-      taskSetToUpdate.get.statusUpdate(status)
-    }
-    if (taskFailed) {
-      // Revive offers if a task had failed for some reason other than host lost
-      reviveOffers()
-    }
-  }
-
-  override def slaveLost(d: SchedulerDriver, s: SlaveID) {
-    logInfo("Slave lost: " + s.getValue)
-    var failedHost: Option[String] = None
-    var lostTids: Option[HashSet[String]] = None
-    synchronized {
-      val slaveId = s.getValue
-      val host = slaveIdToHost(slaveId)
-      if (hostsAlive.contains(host)) {
-        slaveIdsWithExecutors -= slaveId
-        hostsAlive -= host
-        failedHost = Some(host)
-        lostTids = Some(taskIdsOnSlave(slaveId))
-        logInfo("failedHost: " + host)
-        logInfo("lostTids: " + lostTids)
-        taskIdsOnSlave -= slaveId
-        activeTaskSetsQueue.foreach(_.hostLost(host))
-      }
-    }
-    if (failedHost != None) {
-      // Report all the tasks on the failed host as lost, without holding a lock on this
-      for (tid <- lostTids.get; taskSetId <- taskIdToTaskSetId.get(tid)) {
-        // TODO: Maybe call our statusUpdate() instead to clean our internal data structures
-        activeTaskSets(taskSetId).statusUpdate(TaskStatus.newBuilder()
-          .setTaskId(TaskID.newBuilder().setValue(tid).build())
-          .setState(MesosTaskState.TASK_LOST)
-          .build())
-      }
-      // Also report the loss to the DAGScheduler
-      listener.hostLost(failedHost.get)
-      reviveOffers()
-    }
-  }
-
-  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
-  // Check for speculatable tasks in all our active jobs.
-  override def checkSpeculatableTasks() {
-    var shouldRevive = false
-    synchronized {
-      for (ts <- activeTaskSetsQueue) {
-        shouldRevive |= ts.checkSpeculatableTasks()
-      }
-    }
-    if (shouldRevive) {
-      reviveOffers()
-    }
-  }
-
-
-  val lock2 = new Object
-  var firstWait = true
-
-  override def waitForRegister() {
-    lock2.synchronized {
-      if (firstWait) {
-        super.waitForRegister()
-        Thread.sleep(5000)
-        firstWait = false
-      }
-    }
-  }
-
-  def fakeStatusUpdate(status: TaskStatus) {
-    statusUpdate(driver, status)
-  }
-
-  def fakeResourceOffers(offers: Seq[FakeOffer]) {
-    logDebug("fakeResourceOffers: " + offers)
-    val availableCpus = offers.map(_.cores.toDouble).toArray
-    var launchedTask = false
-    for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) {
-      do {
-        launchedTask = false
-        for (i <- 0 until offers.size if hostsAlive.contains(offers(i).host)) {
-          manager.slaveOffer(offers(i).slaveId, offers(i).host, availableCpus(i)) match {
-            case Some(task) => 
-              val tid = task.getTaskId.getValue
-              val sid = offers(i).slaveId
-              taskIdToTaskSetId(tid) = manager.taskSet.id
-              taskSetTaskIds(manager.taskSet.id) += tid
-              taskIdToSlaveId(tid) = sid
-              taskIdsOnSlave(sid) += tid
-              slaveIdsWithExecutors += sid
-              availableCpus(i) -= getResource(task.getResourcesList(), "cpus")
-              launchedTask = true
-              masterActor ! LaunchTask(sid, task)
-              
-            case None => {}
-          }
-        }
-      } while (launchedTask)
-    }
-  }
-
-  override def reviveOffers() {
-    masterActor ! ReviveOffers()
-  }
-}
-
-class WorkerTask(slaveId: String, host: String) extends Task[Unit](-1) {
-  generation = 0
-
-  def run(id: Long) {
-    val env = SparkEnv.get
-    val classLoader = Thread.currentThread.getContextClassLoader
-    val actor = env.actorSystem.actorOf(
-      Props(new WorkerActor(slaveId, host, env, classLoader)),
-      name = "WorkerActor")
-    // Wait forever so that our Mesos task doesn't end
-    while (true) {
-      Thread.sleep(10000)
-    }
-  }
-}
-
-class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: ClassLoader)
-  extends Actor with Logging {
-
-  val threadPool = new ThreadPoolExecutor(
-    1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
-
-  val masterIp: String = System.getProperty("spark.master.host", "localhost")
-  val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
-  val masterActor = env.actorSystem.actorFor(
-    "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, "CoarseMesosScheduler"))
-
-  class TaskRunner(desc: MTaskInfo)
-  extends Runnable {
-    override def run() {
-      val tid = desc.getTaskId.getValue
-      logInfo("Running task ID " + tid)
-      try {
-        SparkEnv.set(env)
-        Thread.currentThread.setContextClassLoader(classLoader)
-        Accumulators.clear
-        val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader)
-        env.mapOutputTracker.updateGeneration(task.generation)
-        val value = task.run(tid.toInt)
-        val accumUpdates = Accumulators.values
-        val result = new TaskResult(value, accumUpdates)
-        masterActor ! StatusUpdate(slaveId, TaskStatus.newBuilder()
-            .setTaskId(desc.getTaskId)
-            .setState(MesosTaskState.TASK_FINISHED)
-            .setData(ByteString.copyFrom(Utils.serialize(result)))
-            .build())
-        logInfo("Finished task ID " + tid)
-      } catch {
-        case ffe: FetchFailedException => {
-          val reason = ffe.toTaskEndReason
-          masterActor ! StatusUpdate(slaveId, TaskStatus.newBuilder()
-              .setTaskId(desc.getTaskId)
-              .setState(MesosTaskState.TASK_FAILED)
-              .setData(ByteString.copyFrom(Utils.serialize(reason)))
-              .build())
-        }
-        case t: Throwable => {
-          val reason = ExceptionFailure(t)
-          masterActor ! StatusUpdate(slaveId, TaskStatus.newBuilder()
-              .setTaskId(desc.getTaskId)
-              .setState(MesosTaskState.TASK_FAILED)
-              .setData(ByteString.copyFrom(Utils.serialize(reason)))
-              .build())
-
-          // TODO: Should we exit the whole executor here? On the one hand, the failed task may
-          // have left some weird state around depending on when the exception was thrown, but on
-          // the other hand, maybe we could detect that when future tasks fail and exit then.
-          logError("Exception in task ID " + tid, t)
-          //System.exit(1)
-        }
-      }
-    }
-  }
-
-  override def preStart {
-    logInfo("Registering with master")
-    masterActor ! RegisterSlave(slaveId, host)
-  }
-
-  override def receive = {
-    case LaunchTask(slaveId_, task) =>
-      threadPool.execute(new TaskRunner(task))    
-  }
-}
-
-*/
\ No newline at end of file
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
new file mode 100644
index 0000000000..040cd6b335
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -0,0 +1,251 @@
+package spark.scheduler.mesos
+
+import com.google.protobuf.ByteString
+
+import org.apache.mesos.{Scheduler => MScheduler}
+import org.apache.mesos._
+import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
+
+import spark.{SparkException, Utils, Logging, SparkContext}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import java.io.File
+import spark.scheduler.cluster._
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+import spark.TaskState
+
+/**
+ * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
+ * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
+ * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
+ * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
+ *
+ * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
+ * remove this.
+ */
+class CoarseMesosSchedulerBackend(
+    scheduler: ClusterScheduler,
+    sc: SparkContext,
+    master: String,
+    frameworkName: String)
+  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+  with MScheduler
+  with Logging {
+
+  // Environment variables to pass to our executors
+  val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
+    "SPARK_MEM",
+    "SPARK_CLASSPATH",
+    "SPARK_LIBRARY_PATH",
+    "SPARK_JAVA_OPTS"
+  )
+
+  // Memory used by each executor (in megabytes)
+  val EXECUTOR_MEMORY = {
+    if (System.getenv("SPARK_MEM") != null) {
+      Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
+      // TODO: Might need to add some extra memory for the non-heap parts of the JVM
+    } else {
+      512
+    }
+  }
+
+  // Lock used to wait for scheduler to be registered
+  var isRegistered = false
+  val registeredLock = new Object()
+
+  // Driver for talking to Mesos
+  var driver: SchedulerDriver = null
+
+  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
+  val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+
+  // Cores we have acquired with each Mesos task ID
+  val coresByTaskId = new HashMap[Int, Int]
+  var totalCoresAcquired = 0
+
+  val slaveIdsWithExecutors = new HashSet[String]
+
+  val sparkHome = sc.getSparkHome() match {
+    case Some(path) =>
+      path
+    case None =>
+      throw new SparkException("Spark home is not set; set it through the spark.home system " +
+        "property, the SPARK_HOME environment variable or the SparkContext constructor")
+  }
+
+  var nextMesosTaskId = 0
+
+  def newMesosTaskId(): Int = {
+    val id = nextMesosTaskId
+    nextMesosTaskId += 1
+    id
+  }
+
+  override def start() {
+    super.start()
+
+    synchronized {
+      new Thread("CoarseMesosSchedulerBackend driver") {
+        setDaemon(true)
+        override def run() {
+          val scheduler = CoarseMesosSchedulerBackend.this
+          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
+          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
+          try { {
+            val ret = driver.run()
+            logInfo("driver.run() returned with code " + ret)
+          }
+          } catch {
+            case e: Exception => logError("driver.run() failed", e)
+          }
+        }
+      }.start()
+
+      waitForRegister()
+    }
+  }
+
+  def createCommand(offer: Offer, numCores: Int): CommandInfo = {
+    val runScript = new File(sparkHome, "run").getCanonicalPath
+    val masterUrl = "akka://spark@%s:%s/user/%s".format(
+      System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
+      StandaloneSchedulerBackend.ACTOR_NAME)
+    val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+      runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
+    val environment = Environment.newBuilder()
+    for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
+      if (System.getenv(key) != null) {
+        environment.addVariables(Environment.Variable.newBuilder()
+          .setName(key)
+          .setValue(System.getenv(key))
+          .build())
+      }
+    }
+    return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
+  }
+
+  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
+
+  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+    logInfo("Registered as framework ID " + frameworkId.getValue)
+    registeredLock.synchronized {
+      isRegistered = true
+      registeredLock.notifyAll()
+    }
+  }
+
+  def waitForRegister() {
+    registeredLock.synchronized {
+      while (!isRegistered) {
+        registeredLock.wait()
+      }
+    }
+  }
+
+  override def disconnected(d: SchedulerDriver) {}
+
+  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
+
+  /**
+   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
+   * unless we've already launched more than we wanted to.
+   */
+  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+    synchronized {
+      val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
+
+      for (offer <- offers) {
+        val slaveId = offer.getSlaveId.toString
+        val mem = getResource(offer.getResourcesList, "mem")
+        val cpus = getResource(offer.getResourcesList, "cpus").toInt
+        if (totalCoresAcquired < maxCores && mem >= EXECUTOR_MEMORY && cpus >= 1 &&
+            !slaveIdsWithExecutors.contains(slaveId)) {
+          // Launch an executor on the slave
+          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
+          val taskId = newMesosTaskId()
+          val task = MesosTaskInfo.newBuilder()
+            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
+            .setSlaveId(offer.getSlaveId)
+            .setCommand(createCommand(offer, cpusToUse))
+            .setName("Task " + taskId)
+            .addResources(createResource("cpus", cpusToUse))
+            .addResources(createResource("mem", EXECUTOR_MEMORY))
+            .build()
+          d.launchTasks(offer.getId, Collections.singletonList(task), filters)
+        } else {
+          // Filter it out
+          d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
+        }
+      }
+    }
+  }
+
+  /** Helper function to pull out a resource from a Mesos Resources protobuf */
+  def getResource(res: JList[Resource], name: String): Double = {
+    for (r <- res if r.getName == name) {
+      return r.getScalar.getValue
+    }
+    // If we reached here, no resource with the required name was present
+    throw new IllegalArgumentException("No resource called " + name + " in " + res)
+  }
+
+  /** Build a Mesos resource protobuf object */
+  def createResource(resourceName: String, quantity: Double): Protos.Resource = {
+    Resource.newBuilder()
+      .setName(resourceName)
+      .setType(Value.Type.SCALAR)
+      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
+      .build()
+  }
+
+  /** Check whether a Mesos task state represents a finished task */
+  def isFinished(state: MesosTaskState) = {
+    state == MesosTaskState.TASK_FINISHED ||
+      state == MesosTaskState.TASK_FAILED ||
+      state == MesosTaskState.TASK_KILLED ||
+      state == MesosTaskState.TASK_LOST
+  }
+
+  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+    val taskId = status.getTaskId.getValue.toInt
+    logInfo("Mesos task " + taskId + " is now " + status.getState)
+    synchronized {
+      if (isFinished(status.getState)) {
+        // Remove the cores we have remembered for this task, if it's in the hashmap
+        for (cores <- coresByTaskId.get(taskId)) {
+          totalCoresAcquired -= cores
+          coresByTaskId -= taskId
+          driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
+        }
+      }
+    }
+  }
+
+  override def error(d: SchedulerDriver, message: String) {
+    logError("Mesos error: " + message)
+    scheduler.error(message)
+  }
+
+  override def stop() {
+    super.stop()
+    if (driver != null) {
+      driver.stop()
+    }
+  }
+
+  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
+
+  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
+    logInfo("Mesos slave lost: " + slaveId.getValue)
+    synchronized {
+      slaveIdsWithExecutors -= slaveId.getValue
+    }
+  }
+
+  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
+    logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
+    slaveLost(d, s)
+  }
+}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 110b178582..44eda93dd1 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -15,6 +15,11 @@ import java.util.{ArrayList => JArrayList, List => JList}
 import java.util.Collections
 import spark.TaskState
 
+/**
+ * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
+ * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
+ * from multiple apps can run on different cores) and in time (a core can switch ownership).
+ */
 class MesosSchedulerBackend(
     scheduler: ClusterScheduler,
     sc: SparkContext,
@@ -60,11 +65,10 @@ class MesosSchedulerBackend(
     synchronized {
       new Thread("MesosSchedulerBackend driver") {
         setDaemon(true)
-
         override def run() {
-          val sched = MesosSchedulerBackend.this
+          val scheduler = MesosSchedulerBackend.this
           val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
-          driver = new MesosSchedulerDriver(sched, fwInfo, master)
+          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
           try {
             val ret = driver.run()
             logInfo("driver.run() returned with code " + ret)
diff --git a/core/src/main/scala/spark/util/SerializableBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala
index fa9e6d62cb..0830843a77 100644
--- a/core/src/main/scala/spark/util/SerializableBuffer.scala
+++ b/core/src/main/scala/spark/util/SerializableBuffer.scala
@@ -8,7 +8,7 @@ import java.nio.channels.Channels
  * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
  * it easier to pass ByteBuffers in case class messages.
  */
-class SerializableBuffer(@transient var buffer: ByteBuffer) {
+class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
   def value = buffer
 
   private def readObject(in: ObjectInputStream) {
-- 
GitLab