Skip to content
Snippets Groups Projects
Commit c5cc10cd authored by Matei Zaharia's avatar Matei Zaharia
Browse files

More work on standalone scheduler

parent 909b3252
No related branches found
No related tags found
No related merge requests found
package spark.executor
import java.nio.ByteBuffer
import spark.Logging
import spark.TaskState.TaskState
import spark.util.AkkaUtils
import akka.actor.{ActorRef, Actor, Props}
import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
import akka.remote.RemoteClientLifeCycleEvent
import spark.scheduler.standalone._
import spark.scheduler.standalone.RegisteredSlave
import spark.scheduler.standalone.LaunchTask
import spark.scheduler.standalone.RegisterSlaveFailed
import spark.scheduler.standalone.RegisterSlave
class StandaloneExecutorBackend(
executor: Executor,
masterUrl: String,
slaveId: String,
hostname: String,
cores: Int)
extends Actor
with ExecutorBackend
with Logging {
val threadPool = new ThreadPoolExecutor(
1, 128, 600, TimeUnit.SECONDS, new SynchronousQueue[Runnable])
var master: ActorRef = null
override def preStart() {
try {
master = context.actorFor(masterUrl)
master ! RegisterSlave(slaveId, hostname, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
System.exit(1)
}
}
override def receive = {
case RegisteredSlave(sparkProperties) =>
logInfo("Successfully registered with master")
executor.initialize(hostname, sparkProperties)
case RegisterSlaveFailed(message) =>
logError("Slave registration failed: " + message)
System.exit(1)
case LaunchTask(slaveId_, taskDesc) =>
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
master ! StatusUpdate(slaveId, taskId, state, data)
}
}
object StandaloneExecutorBackend {
def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
Props(new StandaloneExecutorBackend(new Executor, masterUrl, slaveId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println("Usage: StandaloneExecutorBackend <master> <slaveId> <hostname> <cores>")
System.exit(1)
}
run(args(0), args(1), args(2), args(3).toInt)
}
}
package spark.scheduler.standalone
package spark.scheduler.cluster
import spark.TaskState.TaskState
import spark.scheduler.cluster.TaskDescription
import java.nio.ByteBuffer
import spark.util.SerializableBuffer
sealed trait StandaloneClusterMessage extends Serializable
case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
// Master to slaves
case class LaunchTask(slaveId: String, task: TaskDescription) extends StandaloneClusterMessage
case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
// Slaves to master
case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: Array[Byte])
case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
extends StandaloneClusterMessage
object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data))
}
}
// Internal messages in master
case object ReviveOffers extends StandaloneClusterMessage
case object StopMaster extends StandaloneClusterMessage
package spark.scheduler.standalone
package spark.scheduler.cluster
import scala.collection.mutable.{HashMap, HashSet}
......@@ -7,10 +7,7 @@ import akka.util.duration._
import akka.pattern.ask
import spark.{SparkException, Logging, TaskState}
import spark.TaskState.TaskState
import spark.scheduler.cluster.{WorkerOffer, ClusterScheduler, SchedulerBackend}
import akka.dispatch.Await
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
/**
......@@ -19,8 +16,7 @@ import java.util.concurrent.atomic.AtomicInteger
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
*/
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
extends SchedulerBackend
with Logging {
extends SchedulerBackend with Logging {
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
......@@ -40,7 +36,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
makeOffers()
case StatusUpdate(slaveId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, ByteBuffer.wrap(data))
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
freeCores(slaveId) += 1
makeOffers(slaveId)
......@@ -90,7 +86,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
}
} catch {
case e: Exception =>
throw new SparkException("Error stopping standalone scheduler master actor", e)
throw new SparkException("Error stopping standalone scheduler's master actor", e)
}
}
......
package spark.scheduler.cluster
import java.nio.channels.Channels
import java.nio.ByteBuffer
import java.io.{IOException, EOFException, ObjectOutputStream, ObjectInputStream}
import spark.util.SerializableByteBuffer
import spark.util.SerializableBuffer
class TaskDescription(val taskId: Long, val name: String, _serializedTask: ByteBuffer)
extends Serializable {
// Because ByteBuffers are not serializable, we wrap the task in a SerializableByteBuffer
private val buffer = new SerializableByteBuffer(_serializedTask)
// Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
private val buffer = new SerializableBuffer(_serializedTask)
def serializedTask: ByteBuffer = buffer.value
}
package spark.scheduler.mesos
/*
import java.io.{File, FileInputStream, FileOutputStream}
import java.util.{ArrayList => JArrayList}
import java.util.{List => JList}
......@@ -32,7 +33,6 @@ import spark._
import spark.scheduler._
import spark.scheduler.cluster.{TaskSetManager, ClusterScheduler}
/*
sealed trait CoarseMesosSchedulerMessage
case class RegisterSlave(slaveId: String, host: String) extends CoarseMesosSchedulerMessage
......
......@@ -80,7 +80,7 @@ class MesosSchedulerBackend(
}
def createExecutorInfo(): ExecutorInfo = {
val sparkHome = sc.getSparkHome match {
val sparkHome = sc.getSparkHome() match {
case Some(path) =>
path
case None =>
......
......@@ -5,9 +5,10 @@ import java.io.{IOException, ObjectOutputStream, EOFException, ObjectInputStream
import java.nio.channels.Channels
/**
* A wrapper around java.nio.ByteBuffer to make it serializable through Java serialization.
* A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
* it easier to pass ByteBuffers in case class messages.
*/
class SerializableByteBuffer(@transient var buffer: ByteBuffer) {
class SerializableBuffer(@transient var buffer: ByteBuffer) {
def value = buffer
private def readObject(in: ObjectInputStream) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment