diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 8a06642426ba2d4d80e4d3d5e8f942207f8c86eb..22e1d52f65d3fafa6504ce29d6e4ad82e94137bc 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} -import org.apache.mesos.MesosNativeLibrary +import org.apache.mesos.{Scheduler, MesosNativeLibrary} import spark.broadcast._ @@ -41,8 +41,8 @@ import spark.scheduler.ShuffleMapTask import spark.scheduler.DAGScheduler import spark.scheduler.TaskScheduler import spark.scheduler.local.LocalScheduler -import spark.scheduler.cluster.ClusterScheduler -import spark.scheduler.mesos.MesosSchedulerBackend +import spark.scheduler.cluster.{SchedulerBackend, ClusterScheduler} +import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.BlockManagerMaster class SparkContext( @@ -89,17 +89,15 @@ class SparkContext( new LocalScheduler(threads.toInt, maxFailures.toInt) case _ => MesosNativeLibrary.load() - val sched = new ClusterScheduler(this) - val schedContext = new MesosSchedulerBackend(sched, this, master, frameworkName) - sched.initialize(schedContext) - sched - /* - if (System.getProperty("spark.mesos.coarse", "false") == "true") { - new CoarseMesosScheduler(this, master, frameworkName) + val scheduler = new ClusterScheduler(this) + val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean + val backend = if (coarseGrained) { + new CoarseMesosSchedulerBackend(scheduler, this, master, frameworkName) } else { - new MesosSchedulerBackend(this, master, frameworkName) + new MesosSchedulerBackend(scheduler, this, master, frameworkName) } - */ + scheduler.initialize(backend) + scheduler } } taskScheduler.start() diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index b717ed2b773d6b3d628594a7e7a39215315c0574..26b163de0af894a049b7719c2056c3f6ec0012c6 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -7,11 +7,11 @@ import spark.util.AkkaUtils import akka.actor.{ActorRef, Actor, Props} import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} import akka.remote.RemoteClientLifeCycleEvent -import spark.scheduler.standalone._ -import spark.scheduler.standalone.RegisteredSlave -import spark.scheduler.standalone.LaunchTask -import spark.scheduler.standalone.RegisterSlaveFailed -import spark.scheduler.standalone.RegisterSlave +import spark.scheduler.cluster._ +import spark.scheduler.cluster.RegisteredSlave +import spark.scheduler.cluster.LaunchTask +import spark.scheduler.cluster.RegisterSlaveFailed +import spark.scheduler.cluster.RegisterSlave class StandaloneExecutorBackend( @@ -31,6 +31,7 @@ class StandaloneExecutorBackend( override def preStart() { try { + logInfo("Connecting to master: " + masterUrl) master = context.actorFor(masterUrl) master ! RegisterSlave(slaveId, hostname, cores) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) @@ -51,7 +52,8 @@ class StandaloneExecutorBackend( logError("Slave registration failed: " + message) System.exit(1) - case LaunchTask(slaveId_, taskDesc) => + case LaunchTask(taskDesc) => + logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask) } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 7f1664b483739fcf590bef1d537956890a00d872..5b59479682f2b48cef2d93da39f75021b1552ca9 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -108,7 +108,6 @@ class ClusterScheduler(sc: SparkContext) } } - /** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index 7f19fe0cc514078c0cf4b55f4b769106284bb3fa..80e8733671b4a5788502bf6831cab166b97aeb7b 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -7,7 +7,7 @@ import spark.util.SerializableBuffer sealed trait StandaloneClusterMessage extends Serializable // Master to slaves -case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage +case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 1acf9e86ded6353bc1b94189d1ddf5bfaf0f78f0..c3132abd7a90e9ca2970c320cf9bc9b3564fd6f7 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -1,6 +1,6 @@ package spark.scheduler.cluster -import scala.collection.mutable.{HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor.{Props, Actor, ActorRef, ActorSystem} import akka.util.duration._ @@ -21,19 +21,24 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) - class MasterActor extends Actor { + class MasterActor(sparkProperties: Seq[(String, String)]) extends Actor { val slaveActor = new HashMap[String, ActorRef] val slaveHost = new HashMap[String, String] val freeCores = new HashMap[String, Int] def receive = { case RegisterSlave(slaveId, host, cores) => - slaveActor(slaveId) = sender - logInfo("Registered slave: " + sender + " with ID " + slaveId) - slaveHost(slaveId) = host - freeCores(slaveId) = cores - totalCoreCount.addAndGet(cores) - makeOffers() + if (slaveActor.contains(slaveId)) { + sender ! RegisterSlaveFailed("Duplicate slave ID: " + slaveId) + } else { + logInfo("Registered slave: " + sender + " with ID " + slaveId) + sender ! RegisteredSlave(sparkProperties) + slaveActor(slaveId) = sender + slaveHost(slaveId) = host + freeCores(slaveId) = cores + totalCoreCount.addAndGet(cores) + makeOffers() + } case StatusUpdate(slaveId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) @@ -42,10 +47,6 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor makeOffers(slaveId) } - case LaunchTask(slaveId, task) => - freeCores(slaveId) -= 1 - slaveActor(slaveId) ! LaunchTask(slaveId, task) - case ReviveOffers => makeOffers() @@ -58,14 +59,22 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor // Make fake resource offers on all slaves def makeOffers() { - scheduler.resourceOffers( - slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}) + launchTasks(scheduler.resourceOffers( + slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) } // Make fake resource offers on just one slave def makeOffers(slaveId: String) { - scheduler.resourceOffers( - Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId)))) + launchTasks(scheduler.resourceOffers( + Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))) + } + + // Launch tasks returned by a set of resource offers + def launchTasks(tasks: Seq[Seq[TaskDescription]]) { + for (task <- tasks.flatten) { + freeCores(task.slaveId) -= 1 + slaveActor(task.slaveId) ! LaunchTask(task) + } } } @@ -73,8 +82,17 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor val taskIdsOnSlave = new HashMap[String, HashSet[String]] def start() { + val properties = new ArrayBuffer[(String, String)] + val iterator = System.getProperties.entrySet.iterator + while (iterator.hasNext) { + val entry = iterator.next + val (key, value) = (entry.getKey.toString, entry.getValue.toString) + if (key.startsWith("spark.")) { + properties += ((key, value)) + } + } masterActor = actorSystem.actorOf( - Props(new MasterActor), name = StandaloneSchedulerBackend.ACTOR_NAME) + Props(new MasterActor(properties)), name = StandaloneSchedulerBackend.ACTOR_NAME) } def stop() { diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala index b0b3cbe7d529cf5c1b9b1e27e1a0e7924bb9fb97..f9a1b74fa5e46b6f57602135972ae6024a47dab3 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala @@ -3,7 +3,11 @@ package spark.scheduler.cluster import java.nio.ByteBuffer import spark.util.SerializableBuffer -class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer) +class TaskDescription( + val taskId: Long, + val slaveId: String, + val name: String, + _serializedTask: ByteBuffer) extends Serializable { // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index ab07f1c8c2e264eab12e9ff8875362e66f0bde57..be24316e804b1363ff1164900405645e77382f15 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -191,7 +191,7 @@ class TaskSetManager( def slaveOffer(slaveId: String, host: String, availableCpus: Double): Option[TaskDescription] = { if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis - var localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) + val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) findTask(host, localOnly) match { case Some(index) => { @@ -218,7 +218,7 @@ class TaskSetManager( logInfo("Serialized task %s:%d as %d bytes in %d ms".format( taskSet.id, index, serializedTask.limit, timeTaken)) val taskName = "task %s:%d".format(taskSet.id, index) - return Some(new TaskDescription(taskId, taskName, serializedTask)) + return Some(new TaskDescription(taskId, slaveId, taskName, serializedTask)) } case _ => } @@ -227,7 +227,6 @@ class TaskSetManager( } def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) { - logInfo("statusUpdate: " + tid + " is now " + state + " " + serializedData) state match { case TaskState.FINISHED => taskFinished(tid, state, serializedData) diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala deleted file mode 100644 index 0a6e1350befde8ca49a6b19b757f9b81e54696bc..0000000000000000000000000000000000000000 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosScheduler.scala +++ /dev/null @@ -1,371 +0,0 @@ -package spark.scheduler.mesos - -/* -import java.io.{File, FileInputStream, FileOutputStream} -import java.util.{ArrayList => JArrayList} -import java.util.{List => JList} -import java.util.{HashMap => JHashMap} -import java.util.concurrent._ - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.HashSet -import scala.collection.mutable.Map -import scala.collection.mutable.PriorityQueue -import scala.collection.JavaConversions._ -import scala.math.Ordering - -import akka.actor._ -import akka.dispatch._ -import akka.pattern.ask -import akka.remote._ -import akka.util.Duration -import akka.util.Timeout -import akka.util.duration._ - -import com.google.protobuf.ByteString - -import org.apache.mesos.{Scheduler => MScheduler} -import org.apache.mesos._ -import org.apache.mesos.Protos.{TaskInfo => MTaskInfo, TaskState => MesosTaskState, _} - -import spark._ -import spark.scheduler._ -import spark.scheduler.cluster.{TaskSetManager, ClusterScheduler} - - -sealed trait CoarseMesosSchedulerMessage -case class RegisterSlave(slaveId: String, host: String) extends CoarseMesosSchedulerMessage -case class StatusUpdate(slaveId: String, status: TaskStatus) extends CoarseMesosSchedulerMessage -case class LaunchTask(slaveId: String, task: MTaskInfo) extends CoarseMesosSchedulerMessage -case class ReviveOffers() extends CoarseMesosSchedulerMessage - -case class FakeOffer(slaveId: String, host: String, cores: Int) - -/** - * Mesos scheduler that uses coarse-grained tasks and does its own fine-grained scheduling inside - * them using Akka actors for messaging. Clients should first call start(), then submit task sets - * through the runTasks method. - * - * TODO: This is a pretty big hack for now. - */ -class CoarseMesosScheduler( - sc: SparkContext, - master: String, - frameworkName: String) - extends ClusterScheduler(sc, master, frameworkName) { - - val actorSystem = sc.env.actorSystem - val actorName = "CoarseMesosScheduler" - val coresPerSlave = System.getProperty("spark.coarseMesosScheduler.coresPerSlave", "4").toInt - - class MasterActor extends Actor { - val slaveActor = new HashMap[String, ActorRef] - val slaveHost = new HashMap[String, String] - val freeCores = new HashMap[String, Int] - - def receive = { - case RegisterSlave(slaveId, host) => - slaveActor(slaveId) = sender - logInfo("Registered slave: " + sender + " with ID " + slaveId) - slaveHost(slaveId) = host - freeCores(slaveId) = coresPerSlave - makeFakeOffers() - - case StatusUpdate(slaveId, status) => - fakeStatusUpdate(status) - if (isFinished(status.getState)) { - freeCores(slaveId) += 1 - makeFakeOffers(slaveId) - } - - case LaunchTask(slaveId, task) => - freeCores(slaveId) -= 1 - slaveActor(slaveId) ! LaunchTask(slaveId, task) - - case ReviveOffers() => - logInfo("Reviving offers") - makeFakeOffers() - } - - // Make fake resource offers for all slaves - def makeFakeOffers() { - fakeResourceOffers(slaveHost.toSeq.map{case (id, host) => FakeOffer(id, host, freeCores(id))}) - } - - // Make fake resource offers for all slaves - def makeFakeOffers(slaveId: String) { - fakeResourceOffers(Seq(FakeOffer(slaveId, slaveHost(slaveId), freeCores(slaveId)))) - } - } - - val masterActor: ActorRef = actorSystem.actorOf(Props(new MasterActor), name = actorName) - - val taskIdsOnSlave = new HashMap[String, HashSet[String]] - - /** - * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets - * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that - * tasks are balanced across the cluster. - */ - override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - synchronized { - val tasks = offers.map(o => new JArrayList[MTaskInfo]) - for (i <- 0 until offers.size) { - val o = offers.get(i) - val slaveId = o.getSlaveId.getValue - if (!slaveIdToHost.contains(slaveId)) { - slaveIdToHost(slaveId) = o.getHostname - hostsAlive += o.getHostname - taskIdsOnSlave(slaveId) = new HashSet[String] - // Launch an infinite task on the node that will talk to the MasterActor to get fake tasks - val cpuRes = Resource.newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(1).build()) - .build() - val task = new WorkerTask(slaveId, o.getHostname) - val serializedTask = Utils.serialize(task) - tasks(i).add(MTaskInfo.newBuilder() - .setTaskId(newTaskId()) - .setSlaveId(o.getSlaveId) - .setExecutor(executorInfo) - .setName("worker task") - .addResources(cpuRes) - .setData(ByteString.copyFrom(serializedTask)) - .build()) - } - } - val filters = Filters.newBuilder().setRefuseSeconds(10).build() - for (i <- 0 until offers.size) { - d.launchTasks(offers(i).getId(), tasks(i), filters) - } - } - } - - override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { - val tid = status.getTaskId.getValue - var taskSetToUpdate: Option[TaskSetManager] = None - var taskFailed = false - synchronized { - try { - taskIdToTaskSetId.get(tid) match { - case Some(taskSetId) => - if (activeTaskSets.contains(taskSetId)) { - //activeTaskSets(taskSetId).statusUpdate(status) - taskSetToUpdate = Some(activeTaskSets(taskSetId)) - } - if (isFinished(status.getState)) { - taskIdToTaskSetId.remove(tid) - if (taskSetTaskIds.contains(taskSetId)) { - taskSetTaskIds(taskSetId) -= tid - } - val slaveId = taskIdToSlaveId(tid) - taskIdToSlaveId -= tid - taskIdsOnSlave(slaveId) -= tid - } - if (status.getState == MesosTaskState.TASK_FAILED) { - taskFailed = true - } - case None => - logInfo("Ignoring update from TID " + tid + " because its task set is gone") - } - } catch { - case e: Exception => logError("Exception in statusUpdate", e) - } - } - // Update the task set and DAGScheduler without holding a lock on this, because that can deadlock - if (taskSetToUpdate != None) { - taskSetToUpdate.get.statusUpdate(status) - } - if (taskFailed) { - // Revive offers if a task had failed for some reason other than host lost - reviveOffers() - } - } - - override def slaveLost(d: SchedulerDriver, s: SlaveID) { - logInfo("Slave lost: " + s.getValue) - var failedHost: Option[String] = None - var lostTids: Option[HashSet[String]] = None - synchronized { - val slaveId = s.getValue - val host = slaveIdToHost(slaveId) - if (hostsAlive.contains(host)) { - slaveIdsWithExecutors -= slaveId - hostsAlive -= host - failedHost = Some(host) - lostTids = Some(taskIdsOnSlave(slaveId)) - logInfo("failedHost: " + host) - logInfo("lostTids: " + lostTids) - taskIdsOnSlave -= slaveId - activeTaskSetsQueue.foreach(_.hostLost(host)) - } - } - if (failedHost != None) { - // Report all the tasks on the failed host as lost, without holding a lock on this - for (tid <- lostTids.get; taskSetId <- taskIdToTaskSetId.get(tid)) { - // TODO: Maybe call our statusUpdate() instead to clean our internal data structures - activeTaskSets(taskSetId).statusUpdate(TaskStatus.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(tid).build()) - .setState(MesosTaskState.TASK_LOST) - .build()) - } - // Also report the loss to the DAGScheduler - listener.hostLost(failedHost.get) - reviveOffers() - } - } - - override def offerRescinded(d: SchedulerDriver, o: OfferID) {} - - // Check for speculatable tasks in all our active jobs. - override def checkSpeculatableTasks() { - var shouldRevive = false - synchronized { - for (ts <- activeTaskSetsQueue) { - shouldRevive |= ts.checkSpeculatableTasks() - } - } - if (shouldRevive) { - reviveOffers() - } - } - - - val lock2 = new Object - var firstWait = true - - override def waitForRegister() { - lock2.synchronized { - if (firstWait) { - super.waitForRegister() - Thread.sleep(5000) - firstWait = false - } - } - } - - def fakeStatusUpdate(status: TaskStatus) { - statusUpdate(driver, status) - } - - def fakeResourceOffers(offers: Seq[FakeOffer]) { - logDebug("fakeResourceOffers: " + offers) - val availableCpus = offers.map(_.cores.toDouble).toArray - var launchedTask = false - for (manager <- activeTaskSetsQueue.sortBy(m => (m.taskSet.priority, m.taskSet.stageId))) { - do { - launchedTask = false - for (i <- 0 until offers.size if hostsAlive.contains(offers(i).host)) { - manager.slaveOffer(offers(i).slaveId, offers(i).host, availableCpus(i)) match { - case Some(task) => - val tid = task.getTaskId.getValue - val sid = offers(i).slaveId - taskIdToTaskSetId(tid) = manager.taskSet.id - taskSetTaskIds(manager.taskSet.id) += tid - taskIdToSlaveId(tid) = sid - taskIdsOnSlave(sid) += tid - slaveIdsWithExecutors += sid - availableCpus(i) -= getResource(task.getResourcesList(), "cpus") - launchedTask = true - masterActor ! LaunchTask(sid, task) - - case None => {} - } - } - } while (launchedTask) - } - } - - override def reviveOffers() { - masterActor ! ReviveOffers() - } -} - -class WorkerTask(slaveId: String, host: String) extends Task[Unit](-1) { - generation = 0 - - def run(id: Long) { - val env = SparkEnv.get - val classLoader = Thread.currentThread.getContextClassLoader - val actor = env.actorSystem.actorOf( - Props(new WorkerActor(slaveId, host, env, classLoader)), - name = "WorkerActor") - // Wait forever so that our Mesos task doesn't end - while (true) { - Thread.sleep(10000) - } - } -} - -class WorkerActor(slaveId: String, host: String, env: SparkEnv, classLoader: ClassLoader) - extends Actor with Logging { - - val threadPool = new ThreadPoolExecutor( - 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) - - val masterIp: String = System.getProperty("spark.master.host", "localhost") - val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt - val masterActor = env.actorSystem.actorFor( - "akka://spark@%s:%s/user/%s".format(masterIp, masterPort, "CoarseMesosScheduler")) - - class TaskRunner(desc: MTaskInfo) - extends Runnable { - override def run() { - val tid = desc.getTaskId.getValue - logInfo("Running task ID " + tid) - try { - SparkEnv.set(env) - Thread.currentThread.setContextClassLoader(classLoader) - Accumulators.clear - val task = Utils.deserialize[Task[Any]](desc.getData.toByteArray, classLoader) - env.mapOutputTracker.updateGeneration(task.generation) - val value = task.run(tid.toInt) - val accumUpdates = Accumulators.values - val result = new TaskResult(value, accumUpdates) - masterActor ! StatusUpdate(slaveId, TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) - .setState(MesosTaskState.TASK_FINISHED) - .setData(ByteString.copyFrom(Utils.serialize(result))) - .build()) - logInfo("Finished task ID " + tid) - } catch { - case ffe: FetchFailedException => { - val reason = ffe.toTaskEndReason - masterActor ! StatusUpdate(slaveId, TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) - .setState(MesosTaskState.TASK_FAILED) - .setData(ByteString.copyFrom(Utils.serialize(reason))) - .build()) - } - case t: Throwable => { - val reason = ExceptionFailure(t) - masterActor ! StatusUpdate(slaveId, TaskStatus.newBuilder() - .setTaskId(desc.getTaskId) - .setState(MesosTaskState.TASK_FAILED) - .setData(ByteString.copyFrom(Utils.serialize(reason))) - .build()) - - // TODO: Should we exit the whole executor here? On the one hand, the failed task may - // have left some weird state around depending on when the exception was thrown, but on - // the other hand, maybe we could detect that when future tasks fail and exit then. - logError("Exception in task ID " + tid, t) - //System.exit(1) - } - } - } - } - - override def preStart { - logInfo("Registering with master") - masterActor ! RegisterSlave(slaveId, host) - } - - override def receive = { - case LaunchTask(slaveId_, task) => - threadPool.execute(new TaskRunner(task)) - } -} - -*/ \ No newline at end of file diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala new file mode 100644 index 0000000000000000000000000000000000000000..040cd6b33506f7ae3bd87bd4ff9b92c36eba3fd9 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -0,0 +1,251 @@ +package spark.scheduler.mesos + +import com.google.protobuf.ByteString + +import org.apache.mesos.{Scheduler => MScheduler} +import org.apache.mesos._ +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} + +import spark.{SparkException, Utils, Logging, SparkContext} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.collection.JavaConversions._ +import java.io.File +import spark.scheduler.cluster._ +import java.util.{ArrayList => JArrayList, List => JList} +import java.util.Collections +import spark.TaskState + +/** + * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds + * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever + * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the + * StandaloneBackend mechanism. This class is useful for lower and more predictable latency. + * + * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to + * remove this. + */ +class CoarseMesosSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + master: String, + frameworkName: String) + extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + with MScheduler + with Logging { + + // Environment variables to pass to our executors + val ENV_VARS_TO_SEND_TO_EXECUTORS = Array( + "SPARK_MEM", + "SPARK_CLASSPATH", + "SPARK_LIBRARY_PATH", + "SPARK_JAVA_OPTS" + ) + + // Memory used by each executor (in megabytes) + val EXECUTOR_MEMORY = { + if (System.getenv("SPARK_MEM") != null) { + Utils.memoryStringToMb(System.getenv("SPARK_MEM")) + // TODO: Might need to add some extra memory for the non-heap parts of the JVM + } else { + 512 + } + } + + // Lock used to wait for scheduler to be registered + var isRegistered = false + val registeredLock = new Object() + + // Driver for talking to Mesos + var driver: SchedulerDriver = null + + // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) + val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt + + // Cores we have acquired with each Mesos task ID + val coresByTaskId = new HashMap[Int, Int] + var totalCoresAcquired = 0 + + val slaveIdsWithExecutors = new HashSet[String] + + val sparkHome = sc.getSparkHome() match { + case Some(path) => + path + case None => + throw new SparkException("Spark home is not set; set it through the spark.home system " + + "property, the SPARK_HOME environment variable or the SparkContext constructor") + } + + var nextMesosTaskId = 0 + + def newMesosTaskId(): Int = { + val id = nextMesosTaskId + nextMesosTaskId += 1 + id + } + + override def start() { + super.start() + + synchronized { + new Thread("CoarseMesosSchedulerBackend driver") { + setDaemon(true) + override def run() { + val scheduler = CoarseMesosSchedulerBackend.this + val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build() + driver = new MesosSchedulerDriver(scheduler, fwInfo, master) + try { { + val ret = driver.run() + logInfo("driver.run() returned with code " + ret) + } + } catch { + case e: Exception => logError("driver.run() failed", e) + } + } + }.start() + + waitForRegister() + } + } + + def createCommand(offer: Offer, numCores: Int): CommandInfo = { + val runScript = new File(sparkHome, "run").getCanonicalPath + val masterUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.master.host"), System.getProperty("spark.master.port"), + StandaloneSchedulerBackend.ACTOR_NAME) + val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( + runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores) + val environment = Environment.newBuilder() + for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { + if (System.getenv(key) != null) { + environment.addVariables(Environment.Variable.newBuilder() + .setName(key) + .setValue(System.getenv(key)) + .build()) + } + } + return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build() + } + + override def offerRescinded(d: SchedulerDriver, o: OfferID) {} + + override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + logInfo("Registered as framework ID " + frameworkId.getValue) + registeredLock.synchronized { + isRegistered = true + registeredLock.notifyAll() + } + } + + def waitForRegister() { + registeredLock.synchronized { + while (!isRegistered) { + registeredLock.wait() + } + } + } + + override def disconnected(d: SchedulerDriver) {} + + override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + + /** + * Method called by Mesos to offer resources on slaves. We respond by launching an executor, + * unless we've already launched more than we wanted to. + */ + override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + synchronized { + val filters = Filters.newBuilder().setRefuseSeconds(-1).build() + + for (offer <- offers) { + val slaveId = offer.getSlaveId.toString + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus").toInt + if (totalCoresAcquired < maxCores && mem >= EXECUTOR_MEMORY && cpus >= 1 && + !slaveIdsWithExecutors.contains(slaveId)) { + // Launch an executor on the slave + val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) + val taskId = newMesosTaskId() + val task = MesosTaskInfo.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setSlaveId(offer.getSlaveId) + .setCommand(createCommand(offer, cpusToUse)) + .setName("Task " + taskId) + .addResources(createResource("cpus", cpusToUse)) + .addResources(createResource("mem", EXECUTOR_MEMORY)) + .build() + d.launchTasks(offer.getId, Collections.singletonList(task), filters) + } else { + // Filter it out + d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters) + } + } + } + } + + /** Helper function to pull out a resource from a Mesos Resources protobuf */ + def getResource(res: JList[Resource], name: String): Double = { + for (r <- res if r.getName == name) { + return r.getScalar.getValue + } + // If we reached here, no resource with the required name was present + throw new IllegalArgumentException("No resource called " + name + " in " + res) + } + + /** Build a Mesos resource protobuf object */ + def createResource(resourceName: String, quantity: Double): Protos.Resource = { + Resource.newBuilder() + .setName(resourceName) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) + .build() + } + + /** Check whether a Mesos task state represents a finished task */ + def isFinished(state: MesosTaskState) = { + state == MesosTaskState.TASK_FINISHED || + state == MesosTaskState.TASK_FAILED || + state == MesosTaskState.TASK_KILLED || + state == MesosTaskState.TASK_LOST + } + + override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + val taskId = status.getTaskId.getValue.toInt + logInfo("Mesos task " + taskId + " is now " + status.getState) + synchronized { + if (isFinished(status.getState)) { + // Remove the cores we have remembered for this task, if it's in the hashmap + for (cores <- coresByTaskId.get(taskId)) { + totalCoresAcquired -= cores + coresByTaskId -= taskId + driver.reviveOffers() // In case we'd rejected everything before but have now lost a node + } + } + } + } + + override def error(d: SchedulerDriver, message: String) { + logError("Mesos error: " + message) + scheduler.error(message) + } + + override def stop() { + super.stop() + if (driver != null) { + driver.stop() + } + } + + override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} + + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { + logInfo("Mesos slave lost: " + slaveId.getValue) + synchronized { + slaveIdsWithExecutors -= slaveId.getValue + } + } + + override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { + logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue)) + slaveLost(d, s) + } +} diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index 110b178582372b611cc3716d50e0728cf86111ac..44eda93dd15d301909fa26881b74bb26f5bbf9ee 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -15,6 +15,11 @@ import java.util.{ArrayList => JArrayList, List => JList} import java.util.Collections import spark.TaskState +/** + * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a + * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks + * from multiple apps can run on different cores) and in time (a core can switch ownership). + */ class MesosSchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, @@ -60,11 +65,10 @@ class MesosSchedulerBackend( synchronized { new Thread("MesosSchedulerBackend driver") { setDaemon(true) - override def run() { - val sched = MesosSchedulerBackend.this + val scheduler = MesosSchedulerBackend.this val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build() - driver = new MesosSchedulerDriver(sched, fwInfo, master) + driver = new MesosSchedulerDriver(scheduler, fwInfo, master) try { val ret = driver.run() logInfo("driver.run() returned with code " + ret) diff --git a/core/src/main/scala/spark/util/SerializableBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala index fa9e6d62cb7704a86636420f8913142ac23d416e..0830843a77ad8c159f67df44288d46dcd29bff69 100644 --- a/core/src/main/scala/spark/util/SerializableBuffer.scala +++ b/core/src/main/scala/spark/util/SerializableBuffer.scala @@ -8,7 +8,7 @@ import java.nio.channels.Channels * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make * it easier to pass ByteBuffers in case class messages. */ -class SerializableBuffer(@transient var buffer: ByteBuffer) { +class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable { def value = buffer private def readObject(in: ObjectInputStream) {