Skip to content
Snippets Groups Projects
Commit 51c46eac authored by Matei Zaharia's avatar Matei Zaharia
Browse files

More work on standalone deploy system.

parent a6eb9fda
No related branches found
No related tags found
No related merge requests found
Showing
with 465 additions and 85 deletions
package spark.deploy
import scala.collection.Map
case class Command(
mainClass: String,
arguments: Seq[String],
environment: Map[String, String]) {
}
package spark.deploy
sealed trait DeployMessage
sealed trait DeployMessage extends Serializable
case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends DeployMessage
case class RegisteredSlave(clusterId: String, slaveId: Int) extends DeployMessage
// Worker to Master
case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int)
extends DeployMessage
case class ExecutorStateChanged(jobId: String, execId: Int, state: ExecutorState.Value, message: String)
extends DeployMessage
// Master to Worker
case object RegisteredWorker extends DeployMessage
case class RegisterWorkerFailed(message: String) extends DeployMessage
case class LaunchExecutor(jobId: String, execId: Int, jobDesc: JobDescription) extends DeployMessage
// Client to Master
case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
// Master to Client
case class RegisteredJob(jobId: String) extends DeployMessage
case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
case class ExecutorUpdated(id: Int, state: ExecutorState.Value, message: String)
// Internal message in Client
case object StopClient
\ No newline at end of file
package spark.deploy
object ExecutorState extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED") {
val LAUNCHING, RUNNING, FINISHED, FAILED = Value
def isFinished(state: Value): Boolean = (state == FINISHED || state == FAILED)
}
package spark.deploy
class JobDescription(
val name: String,
val memoryPerSlave: Int,
val cores: Int,
val resources: Seq[String],
val command: Command)
extends Serializable {
val user = System.getProperty("user.name", "<unknown>")
}
package spark.deploy.client
import spark.deploy._
import akka.actor._
import akka.pattern.ask
import akka.util.duration._
import spark.{SparkException, Logging}
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
import spark.deploy.RegisterJob
import akka.remote.RemoteClientDisconnected
import akka.actor.Terminated
import akka.dispatch.Await
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description,
* and a listener for job events, and calls back the listener when various events occur.
*/
class Client(
actorSystem: ActorSystem,
masterUrl: String,
jobDescription: JobDescription,
listener: ClientListener)
extends Logging {
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
var actor: ActorRef = null
var jobId: String = null
if (MASTER_REGEX.unapplySeq(masterUrl) == None) {
throw new SparkException("Invalid master URL: " + masterUrl)
}
class ClientActor extends Actor with Logging {
var master: ActorRef = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
override def preStart() {
val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get
logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
//master ! RegisterWorker(ip, port, cores, memory)
master ! RegisterJob(jobDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
markDisconnected()
context.stop(self)
}
}
override def receive = {
case RegisteredJob(jobId_) =>
jobId = jobId_
listener.connected(jobId)
case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) =>
val fullId = jobId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
listener.executorAdded(fullId, workerId, host, cores, memory)
case ExecutorUpdated(id, state, message) =>
val fullId = jobId + "/" + id
val messageText = if (message == null) "" else " (" + message + ")"
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
listener.executorRemoved(fullId, message)
}
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)
case StopClient =>
markDisconnected()
sender ! true
context.stop(self)
}
/**
* Notify the listener that we disconnected, if we hadn't already done so before.
*/
def markDisconnected() {
if (!alreadyDisconnected) {
listener.disconnected()
alreadyDisconnected = true
}
}
}
def start() {
// Just launch an actor; it will call back into the listener.
actor = actorSystem.actorOf(Props(new ClientActor))
}
def stop() {
if (actor != null) {
val timeout = 1.seconds
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
actor = null
}
}
}
package spark.deploy.client
/**
* Callbacks invoked by deploy client when various events happen. There are currently four events:
* connecting to the cluster, disconnecting, being given an executor, and having an executor
* removed (either due to failure or due to revocation).
*
* Users of this API should *not* block inside the callback methods.
*/
trait ClientListener {
def connected(jobId: String): Unit
def disconnected(): Unit
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit
def executorRemoved(id: String, message: String): Unit
}
package spark.deploy.client
import spark.util.AkkaUtils
import spark.{Logging, Utils}
import spark.deploy.{Command, JobDescription}
object TestClient {
class TestListener extends ClientListener with Logging {
def connected(id: String) {
logInfo("Connected to master, got job ID " + id)
}
def disconnected() {
logInfo("Disconnected from master")
System.exit(0)
}
def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {}
def executorRemoved(id: String, message: String) {}
}
def main(args: Array[String]) {
val url = args(0)
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress(), 0)
val desc = new JobDescription("TestClient", 200, 1, Seq(),
Command("spark.deploy.client.TestExecutor", Seq(), Map()))
val listener = new TestListener
val client = new Client(actorSystem, url, desc, listener)
client.start()
actorSystem.awaitTermination()
}
}
package spark.deploy.master
import spark.deploy.ExecutorState
class ExecutorInfo(
val id: Int,
val job: JobInfo,
val worker: WorkerInfo,
val cores: Int,
val memory: Int) {
var state = ExecutorState.LAUNCHING
def fullId: String = job.id + "/" + id
}
package spark.deploy.master
import spark.deploy.JobDescription
import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
var nextExecutorId = 0
def newExecutorId(): Int = {
val id = nextExecutorId
nextExecutorId += 1
id
}
def newExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
executors(exec.id) = exec
exec
}
}
package spark.deploy.master
object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
val WAITING, RUNNING, FINISHED, FAILED = Value
}
package spark.deploy.master
import scala.collection.mutable.HashMap
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import java.text.SimpleDateFormat
import java.util.Date
import spark.deploy.{RegisteredSlave, RegisterSlave}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.remote.RemoteClientLifeCycleEvent
import spark.deploy._
import akka.remote.RemoteClientShutdown
import spark.deploy.RegisteredSlave
import akka.remote.RemoteClientDisconnected
import spark.deploy.RegisterWorker
import spark.deploy.RegisterWorkerFailed
import akka.actor.Terminated
import scala.Some
import spark.deploy.RegisterSlave
class SlaveInfo(
val id: Int,
val host: String,
val port: Int,
val cores: Int,
val memory: Int,
val actor: ActorRef) {
var coresUsed = 0
var memoryUsed = 0
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
}
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val clusterId = newClusterId()
var nextSlaveId = 0
var nextJobId = 0
val slaves = new HashMap[Int, SlaveInfo]
val actorToSlave = new HashMap[ActorRef, SlaveInfo]
val addressToSlave = new HashMap[Address, SlaveInfo]
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
var nextJobNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
val actorToWorker = new HashMap[ActorRef, WorkerInfo]
val addressToWorker = new HashMap[Address, WorkerInfo]
val jobs = new HashSet[WorkerInfo]
val idToJob = new HashMap[String, JobInfo]
val actorToJob = new HashMap[ActorRef, JobInfo]
val addressToJob = new HashMap[Address, JobInfo]
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
logInfo("Cluster ID: " + clusterId)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi()
}
......@@ -58,50 +51,141 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
}
override def receive = {
case RegisterSlave(host, slavePort, cores, memory) => {
logInfo("Registering slave %s:%d with %d cores, %s RAM".format(
host, slavePort, cores, Utils.memoryMegabytesToString(memory)))
val slave = addSlave(host, slavePort, cores, memory)
case RegisterWorker(id, host, workerPort, cores, memory) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = addWorker(id, host, workerPort, cores, memory)
context.watch(sender) // This doesn't work with remote actors but helps for testing
sender ! RegisteredWorker
schedule()
}
}
case RegisterJob(description) => {
logInfo("Registering job " + description.name)
val job = addJob(description, sender)
logInfo("Registered job " + description.name + " with ID " + job.id)
waitingJobs += job
context.watch(sender) // This doesn't work with remote actors but helps for testing
sender ! RegisteredSlave(clusterId, slave.id)
sender ! RegisteredJob(job.id)
schedule()
}
case ExecutorStateChanged(jobId, execId, state, message) => {
val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId))
execOption match {
case Some(exec) => {
exec.state = state
exec.job.actor ! ExecutorUpdated(execId, state, message)
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and job
logInfo("Removing executor " + exec.fullId)
idToJob(jobId).executors -= exec.id
exec.worker.removeExecutor(exec)
}
}
case None =>
logWarning("Got status update for unknown executor " + jobId + "/" + execId)
}
}
case Terminated(actor) => {
// The disconnected actor could've been either a worker or a job; remove whichever of
// those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker)
actorToJob.get(actor).foreach(removeJob)
}
case RemoteClientDisconnected(transport, address) =>
logInfo("Remote client disconnected: " + address)
addressToSlave.get(address).foreach(s => removeSlave(s)) // Remove slave, if any, at address
case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or a job; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or a job; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
}
/**
* Schedule the currently available resources among waiting jobs. This method will be called
* every time a new job joins or resource availability changes.
*/
def schedule() {
// Right now this is a very simple FIFO with backfilling. We keep looking through the jobs
// in order of submission time and launching the first one that fits in the cluster.
// It's also not very efficient in terms of algorithmic complexity.
for (job <- waitingJobs) {
// Figure out how many cores the job could use on the whole cluster
val jobMemory = job.desc.memoryPerSlave
val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum
if (usableCores >= job.desc.cores) {
// We can launch it! Let's just partition the workers into executors for this job.
// TODO: Probably want to spread stuff out across nodes more.
var coresLeft = job.desc.cores
for (worker <- workers if worker.memoryFree >= jobMemory && coresLeft > 0) {
val coresToUse = math.min(worker.coresFree, coresLeft)
val exec = job.newExecutor(worker, coresToUse)
launchExecutor(worker, exec)
coresLeft -= coresToUse
}
}
}
}
case RemoteClientShutdown(transport, address) =>
logInfo("Remote client shutdown: " + address)
addressToSlave.get(address).foreach(s => removeSlave(s)) // Remove slave, if any, at address
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc)
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
case Terminated(actor) =>
logInfo("Slave disconnected: " + actor)
actorToSlave.get(actor).foreach(s => removeSlave(s)) // Remove slave, if any, at actor
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int): WorkerInfo = {
val worker = new WorkerInfo(id, host, port, cores, memory, sender)
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
addressToWorker(sender.path.address) = worker
return worker
}
def addSlave(host: String, slavePort: Int, cores: Int, memory: Int): SlaveInfo = {
val slave = new SlaveInfo(newSlaveId(), host, slavePort, cores, memory, sender)
slaves(slave.id) = slave
actorToSlave(sender) = slave
addressToSlave(sender.path.address) = slave
return slave
def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
idToWorker -= worker.id
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
}
def removeSlave(slave: SlaveInfo) {
logInfo("Removing slave " + slave.id + " on " + slave.host + ":" + slave.port)
slaves -= slave.id
actorToSlave -= slave.actor
addressToSlave -= slave.actor.path.address
def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
val date = new Date
val job = new JobInfo(newJobId(date), desc, date, actor)
idToJob(job.id) = job
actorToJob(sender) = job
addressToJob(sender.path.address) = job
return job
}
def newClusterId(): String = {
val date = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date())
"%s-%04d".format(date, (math.random * 10000).toInt)
def removeJob(job: JobInfo) {
logInfo("Removing job " + job.id)
idToJob -= job.id
actorToJob -= job.actor
addressToWorker -= job.actor.path.address
completedJobs += job // Remember it in our history
for (exec <- job.executors.values) {
}
schedule()
}
def newSlaveId(): Int = {
nextSlaveId += 1
nextSlaveId - 1
/** Generate a new job ID given a job's submission date */
def newJobId(submitDate: Date): String = {
val jobId = "job-%s-%4d".format(DATE_FORMAT.format(submitDate), nextJobNumber)
nextJobNumber += 1
jobId
}
}
......
package spark.deploy.master
import akka.actor.ActorRef
import scala.collection.mutable
class WorkerInfo(
val id: String,
val host: String,
val port: Int,
val cores: Int,
val memory: Int,
val actor: ActorRef) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
var coresUsed = 0
var memoryUsed = 0
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
def addExecutor(exec: ExecutorInfo) {
executors(exec.fullId) = exec
coresUsed += exec.cores
memoryUsed += exec.memory
}
def removeExecutor(exec: ExecutorInfo) {
if (executors.contains(exec.fullId)) {
executors -= exec.fullId
coresUsed -= exec.cores
memoryUsed -= exec.memory
}
}
}
......@@ -2,22 +2,22 @@ package spark.deploy.worker
import akka.actor.{ActorRef, Terminated, Props, Actor}
import akka.pattern.ask
import akka.util.duration._
import spark.{SparkException, Logging, Utils}
import spark.util.{IntParam, AkkaUtils}
import spark.deploy.{RegisterSlave, RegisteredSlave}
import akka.dispatch.Await
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy.{RegisterWorkerFailed, RegisterWorker, RegisteredWorker}
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
import java.text.SimpleDateFormat
import java.util.Date
class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrl: String)
extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
var master: ActorRef = null
var clusterId: String = null
var slaveId: Int = 0
val workerId = generateWorkerId()
var coresUsed = 0
var memoryUsed = 0
......@@ -34,19 +34,20 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
def connectToMaster() {
masterUrl match {
case MASTER_REGEX(masterHost, masterPort) =>
case MASTER_REGEX(masterHost, masterPort) => {
logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
master ! RegisterSlave(ip, port, cores, memory)
master ! RegisterWorker(workerId, ip, port, cores, memory)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
System.exit(1)
}
}
case _ =>
logError("Invalid master URL: " + masterUrl)
......@@ -66,26 +67,27 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
}
override def receive = {
case RegisteredSlave(clusterId_, slaveId_) =>
this.clusterId = clusterId_
this.slaveId = slaveId_
logInfo("Registered with master, cluster ID = " + clusterId + ", slave ID = " + slaveId)
case RemoteClientDisconnected(_, _) =>
masterDisconnected()
case RegisteredWorker =>
logInfo("Successfully registered with master")
case RemoteClientShutdown(_, _) =>
masterDisconnected()
case RegisterWorkerFailed(message) =>
logError("Worker registration failed: " + message)
System.exit(1)
case Terminated(_) =>
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
}
def masterDisconnected() {
// Not sure what to do here exactly, so just shut down for now.
// TODO: It would be nice to try to reconnect to the master, but just shut down for now.
// (Note that if reconnecting we would also need to assign IDs differently.)
logError("Connection to master failed! Shutting down.")
System.exit(1)
}
def generateWorkerId(): String = {
"worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), ip, port)
}
}
object Worker {
......
......@@ -53,7 +53,7 @@ object AkkaUtils {
val server = actorSystem.actorOf(
Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer")
actorSystem.registerOnTermination { ioWorker.stop() }
val timeout = 1.seconds
val timeout = 3.seconds
val future = server.ask(HttpServer.Bind(ip, port))(timeout)
try {
Await.result(future, timeout) match {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment