Skip to content
Snippets Groups Projects
Commit 909b3252 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Further refactoring, and start of a standalone scheduler backend

parent 4e2fe0bd
No related branches found
No related tags found
No related merge requests found
Showing
with 211 additions and 29 deletions
...@@ -42,7 +42,7 @@ import spark.scheduler.DAGScheduler ...@@ -42,7 +42,7 @@ import spark.scheduler.DAGScheduler
import spark.scheduler.TaskScheduler import spark.scheduler.TaskScheduler
import spark.scheduler.local.LocalScheduler import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.ClusterScheduler import spark.scheduler.cluster.ClusterScheduler
import spark.scheduler.mesos.MesosScheduler import spark.scheduler.mesos.MesosSchedulerBackend
import spark.storage.BlockManagerMaster import spark.storage.BlockManagerMaster
class SparkContext( class SparkContext(
...@@ -90,14 +90,14 @@ class SparkContext( ...@@ -90,14 +90,14 @@ class SparkContext(
case _ => case _ =>
MesosNativeLibrary.load() MesosNativeLibrary.load()
val sched = new ClusterScheduler(this) val sched = new ClusterScheduler(this)
val schedContext = new MesosScheduler(sched, this, master, frameworkName) val schedContext = new MesosSchedulerBackend(sched, this, master, frameworkName)
sched.initialize(schedContext) sched.initialize(schedContext)
sched sched
/* /*
if (System.getProperty("spark.mesos.coarse", "false") == "true") { if (System.getProperty("spark.mesos.coarse", "false") == "true") {
new CoarseMesosScheduler(this, master, frameworkName) new CoarseMesosScheduler(this, master, frameworkName)
} else { } else {
new MesosScheduler(this, master, frameworkName) new MesosSchedulerBackend(this, master, frameworkName)
} }
*/ */
} }
......
...@@ -47,11 +47,11 @@ class Executor extends Logging { ...@@ -47,11 +47,11 @@ class Executor extends Logging {
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable]) 1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
} }
def launchTask(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer) { def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
threadPool.execute(new TaskRunner(context, taskId, serializedTask)) threadPool.execute(new TaskRunner(context, taskId, serializedTask))
} }
class TaskRunner(context: ExecutorContext, taskId: Long, serializedTask: ByteBuffer) class TaskRunner(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer)
extends Runnable { extends Runnable {
override def run() { override def run() {
......
...@@ -4,8 +4,8 @@ import java.nio.ByteBuffer ...@@ -4,8 +4,8 @@ import java.nio.ByteBuffer
import spark.TaskState.TaskState import spark.TaskState.TaskState
/** /**
* Interface used by Executor to send back updates to the cluster scheduler. * A pluggable interface used by the Executor to send updates to the cluster scheduler.
*/ */
trait ExecutorContext { trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
} }
...@@ -8,9 +8,9 @@ import com.google.protobuf.ByteString ...@@ -8,9 +8,9 @@ import com.google.protobuf.ByteString
import spark.{Utils, Logging} import spark.{Utils, Logging}
import spark.TaskState import spark.TaskState
class MesosExecutorRunner(executor: Executor) class MesosExecutorBackend(executor: Executor)
extends MesosExecutor extends MesosExecutor
with ExecutorContext with ExecutorBackend
with Logging { with Logging {
var driver: ExecutorDriver = null var driver: ExecutorDriver = null
...@@ -59,11 +59,11 @@ class MesosExecutorRunner(executor: Executor) ...@@ -59,11 +59,11 @@ class MesosExecutorRunner(executor: Executor)
/** /**
* Entry point for Mesos executor. * Entry point for Mesos executor.
*/ */
object MesosExecutorRunner { object MesosExecutorBackend {
def main(args: Array[String]) { def main(args: Array[String]) {
MesosNativeLibrary.load() MesosNativeLibrary.load()
// Create a new Executor and start it running // Create a new Executor and start it running
val runner = new MesosExecutorRunner(new Executor) val runner = new MesosExecutorBackend(new Executor)
new MesosExecutorDriver(runner).run() new MesosExecutorDriver(runner).run()
} }
} }
...@@ -50,7 +50,7 @@ class ClusterScheduler(sc: SparkContext) ...@@ -50,7 +50,7 @@ class ClusterScheduler(sc: SparkContext)
// Listener object to pass upcalls into // Listener object to pass upcalls into
var listener: TaskSchedulerListener = null var listener: TaskSchedulerListener = null
var schedContext: ClusterSchedulerContext = null var backend: SchedulerBackend = null
val mapOutputTracker = SparkEnv.get.mapOutputTracker val mapOutputTracker = SparkEnv.get.mapOutputTracker
...@@ -58,15 +58,15 @@ class ClusterScheduler(sc: SparkContext) ...@@ -58,15 +58,15 @@ class ClusterScheduler(sc: SparkContext)
this.listener = listener this.listener = listener
} }
def initialize(context: ClusterSchedulerContext) { def initialize(context: SchedulerBackend) {
schedContext = context backend = context
createJarServer() createJarServer()
} }
def newTaskId(): Long = nextTaskId.getAndIncrement() def newTaskId(): Long = nextTaskId.getAndIncrement()
override def start() { override def start() {
schedContext.start() backend.start()
if (System.getProperty("spark.speculation", "false") == "true") { if (System.getProperty("spark.speculation", "false") == "true") {
new Thread("ClusterScheduler speculation check") { new Thread("ClusterScheduler speculation check") {
...@@ -95,7 +95,7 @@ class ClusterScheduler(sc: SparkContext) ...@@ -95,7 +95,7 @@ class ClusterScheduler(sc: SparkContext)
activeTaskSetsQueue += manager activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]() taskSetTaskIds(taskSet.id) = new HashSet[Long]()
} }
schedContext.reviveOffers() backend.reviveOffers()
} }
def taskSetFinished(manager: TaskSetManager) { def taskSetFinished(manager: TaskSetManager) {
...@@ -197,11 +197,11 @@ class ClusterScheduler(sc: SparkContext) ...@@ -197,11 +197,11 @@ class ClusterScheduler(sc: SparkContext)
} }
if (failedHost != None) { if (failedHost != None) {
listener.hostLost(failedHost.get) listener.hostLost(failedHost.get)
schedContext.reviveOffers() backend.reviveOffers()
} }
if (taskFailed) { if (taskFailed) {
// Also revive offers if a task had failed for some reason other than host lost // Also revive offers if a task had failed for some reason other than host lost
schedContext.reviveOffers() backend.reviveOffers()
} }
} }
...@@ -227,15 +227,15 @@ class ClusterScheduler(sc: SparkContext) ...@@ -227,15 +227,15 @@ class ClusterScheduler(sc: SparkContext)
} }
override def stop() { override def stop() {
if (schedContext != null) { if (backend != null) {
schedContext.stop() backend.stop()
} }
if (jarServer != null) { if (jarServer != null) {
jarServer.stop() jarServer.stop()
} }
} }
override def defaultParallelism() = schedContext.defaultParallelism() override def defaultParallelism() = backend.defaultParallelism()
// Create a server for all the JARs added by the user to SparkContext. // Create a server for all the JARs added by the user to SparkContext.
// We first copy the JARs to a temp directory for easier server setup. // We first copy the JARs to a temp directory for easier server setup.
...@@ -271,7 +271,7 @@ class ClusterScheduler(sc: SparkContext) ...@@ -271,7 +271,7 @@ class ClusterScheduler(sc: SparkContext)
} }
} }
if (shouldRevive) { if (shouldRevive) {
schedContext.reviveOffers() backend.reviveOffers()
} }
} }
...@@ -288,7 +288,7 @@ class ClusterScheduler(sc: SparkContext) ...@@ -288,7 +288,7 @@ class ClusterScheduler(sc: SparkContext)
} }
if (failedHost != None) { if (failedHost != None) {
listener.hostLost(failedHost.get) listener.hostLost(failedHost.get)
schedContext.reviveOffers() backend.reviveOffers()
} }
} }
} }
package spark.scheduler.cluster package spark.scheduler.cluster
trait ClusterSchedulerContext { /**
* A backend interface for cluster scheduling systems that allows plugging in different ones under
* ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
* machines become available and can launch tasks on them.
*/
trait SchedulerBackend {
def start(): Unit def start(): Unit
def stop(): Unit def stop(): Unit
def reviveOffers(): Unit def reviveOffers(): Unit
......
package spark.scheduler.cluster package spark.scheduler.cluster
import java.nio.channels.Channels
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.io.{IOException, EOFException, ObjectOutputStream, ObjectInputStream}
import spark.util.SerializableByteBuffer
class TaskDescription(val taskId: Long, val name: String, val serializedTask: ByteBuffer) {} class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer)
extends Serializable {
// Because ByteBuffers are not serializable, we wrap the task in a SerializableByteBuffer
private val buffer = new SerializableByteBuffer(_serializedTask)
def serializedTask: ByteBuffer = buffer.value
}
...@@ -15,12 +15,12 @@ import java.util.{ArrayList => JArrayList, List => JList} ...@@ -15,12 +15,12 @@ import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections import java.util.Collections
import spark.TaskState import spark.TaskState
class MesosScheduler( class MesosSchedulerBackend(
scheduler: ClusterScheduler, scheduler: ClusterScheduler,
sc: SparkContext, sc: SparkContext,
master: String, master: String,
frameworkName: String) frameworkName: String)
extends ClusterSchedulerContext extends SchedulerBackend
with MScheduler with MScheduler
with Logging { with Logging {
...@@ -58,11 +58,11 @@ class MesosScheduler( ...@@ -58,11 +58,11 @@ class MesosScheduler(
override def start() { override def start() {
synchronized { synchronized {
new Thread("MesosScheduler driver") { new Thread("MesosSchedulerBackend driver") {
setDaemon(true) setDaemon(true)
override def run() { override def run() {
val sched = MesosScheduler.this val sched = MesosSchedulerBackend.this
val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build() val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(frameworkName).build()
driver = new MesosSchedulerDriver(sched, fwInfo, master) driver = new MesosSchedulerDriver(sched, fwInfo, master)
try { try {
......
package spark.scheduler.standalone
import spark.TaskState.TaskState
import spark.scheduler.cluster.TaskDescription
sealed trait StandaloneClusterMessage extends Serializable
case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage
case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: Array[Byte])
extends StandaloneClusterMessage
case object ReviveOffers extends StandaloneClusterMessage
case object StopMaster extends StandaloneClusterMessage
package spark.scheduler.standalone
import scala.collection.mutable.{HashMap, HashSet}
import akka.actor.{Props, Actor, ActorRef, ActorSystem}
import akka.util.duration._
import akka.pattern.ask
import spark.{SparkException, Logging, TaskState}
import spark.TaskState.TaskState
import spark.scheduler.cluster.{WorkerOffer, ClusterScheduler, SchedulerBackend}
import akka.dispatch.Await
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
/**
* A standalone scheduler backend, which waits for standalone executors to connect to it through
* Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
*/
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
extends SchedulerBackend
with Logging {
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
class MasterActor extends Actor {
val slaveActor = new HashMap[String, ActorRef]
val slaveHost = new HashMap[String, String]
val freeCores = new HashMap[String, Int]
def receive = {
case RegisterSlave(slaveId, host, cores) =>
slaveActor(slaveId) = sender
logInfo("Registered slave: " + sender + " with ID " + slaveId)
slaveHost(slaveId) = host
freeCores(slaveId) = cores
totalCoreCount.addAndGet(cores)
makeOffers()
case StatusUpdate(slaveId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, ByteBuffer.wrap(data))
if (TaskState.isFinished(state)) {
freeCores(slaveId) += 1
makeOffers(slaveId)
}
case LaunchTask(slaveId, task) =>
freeCores(slaveId) -= 1
slaveActor(slaveId) ! LaunchTask(slaveId, task)
case ReviveOffers =>
makeOffers()
case StopMaster =>
sender ! true
context.stop(self)
// TODO: Deal with nodes disconnecting too! (Including decreasing totalCoreCount)
}
// Make fake resource offers on all slaves
def makeOffers() {
scheduler.resourceOffers(
slaveHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})
}
// Make fake resource offers on just one slave
def makeOffers(slaveId: String) {
scheduler.resourceOffers(
Seq(new WorkerOffer(slaveId, slaveHost(slaveId), freeCores(slaveId))))
}
}
var masterActor: ActorRef = null
val taskIdsOnSlave = new HashMap[String, HashSet[String]]
def start() {
masterActor = actorSystem.actorOf(
Props(new MasterActor), name = StandaloneSchedulerBackend.ACTOR_NAME)
}
def stop() {
try {
if (masterActor != null) {
val timeout = 5.seconds
val future = masterActor.ask(StopMaster)(timeout)
Await.result(future, timeout)
}
} catch {
case e: Exception =>
throw new SparkException("Error stopping standalone scheduler master actor", e)
}
}
def reviveOffers() {
masterActor ! ReviveOffers
}
def defaultParallelism(): Int = totalCoreCount.get()
}
object StandaloneSchedulerBackend {
val ACTOR_NAME = "StandaloneScheduler"
}
package spark.util
import java.nio.ByteBuffer
import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream}
import java.nio.channels.Channels
/**
* A wrapper around java.nio.ByteBuffer to make it serializable through Java serialization.
*/
class SerializableByteBuffer(@transient var buffer: ByteBuffer) {
def value = buffer
private def readObject(in: ObjectInputStream) {
val length = in.readInt()
buffer = ByteBuffer.allocate(length)
var amountRead = 0
val channel = Channels.newChannel(in)
while (amountRead < length) {
val ret = channel.read(buffer)
if (ret == -1) {
throw new EOFException("End of file before fully reading buffer")
}
amountRead += ret
}
buffer.rewind() // Allow us to read it later
}
private def writeObject(out: ObjectOutputStream) {
out.writeInt(buffer.limit())
if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
throw new IOException("Could not fully write buffer to output stream")
}
buffer.rewind() // Allow us to write it again later
}
}
#!/bin/sh #!/bin/sh
FWDIR="`dirname $0`" FWDIR="`dirname $0`"
echo "Running spark-executor with framework dir = $FWDIR" echo "Running spark-executor with framework dir = $FWDIR"
exec $FWDIR/run spark.executor.MesosExecutorRunner exec $FWDIR/run spark.executor.MesosExecutorBackend
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment