diff --git a/core/src/main/scala/spark/deploy/Command.scala b/core/src/main/scala/spark/deploy/Command.scala new file mode 100644 index 0000000000000000000000000000000000000000..344888919a6819c8d6a9645049eac4934600d5ce --- /dev/null +++ b/core/src/main/scala/spark/deploy/Command.scala @@ -0,0 +1,9 @@ +package spark.deploy + +import scala.collection.Map + +case class Command( + mainClass: String, + arguments: Seq[String], + environment: Map[String, String]) { +} diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index 4e641157e1c838f516e0760bb02f344ce88d4290..c63f542bb0314fcfb71baa82b24a698f460f0c6a 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -1,6 +1,30 @@ package spark.deploy -sealed trait DeployMessage +sealed trait DeployMessage extends Serializable -case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends DeployMessage -case class RegisteredSlave(clusterId: String, slaveId: Int) extends DeployMessage +// Worker to Master + +case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int) + extends DeployMessage +case class ExecutorStateChanged(jobId: String, execId: Int, state: ExecutorState.Value, message: String) + extends DeployMessage + +// Master to Worker + +case object RegisteredWorker extends DeployMessage +case class RegisterWorkerFailed(message: String) extends DeployMessage +case class LaunchExecutor(jobId: String, execId: Int, jobDesc: JobDescription) extends DeployMessage + +// Client to Master + +case class RegisterJob(jobDescription: JobDescription) extends DeployMessage + +// Master to Client + +case class RegisteredJob(jobId: String) extends DeployMessage +case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) +case class ExecutorUpdated(id: Int, state: ExecutorState.Value, message: String) + +// Internal message in Client + +case object StopClient \ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/ExecutorState.scala b/core/src/main/scala/spark/deploy/ExecutorState.scala new file mode 100644 index 0000000000000000000000000000000000000000..f26609d8c9dcb53cf6059d4b483f6fea3dfc1316 --- /dev/null +++ b/core/src/main/scala/spark/deploy/ExecutorState.scala @@ -0,0 +1,7 @@ +package spark.deploy + +object ExecutorState extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED") { + val LAUNCHING, RUNNING, FINISHED, FAILED = Value + + def isFinished(state: Value): Boolean = (state == FINISHED || state == FAILED) +} diff --git a/core/src/main/scala/spark/deploy/JobDescription.scala b/core/src/main/scala/spark/deploy/JobDescription.scala new file mode 100644 index 0000000000000000000000000000000000000000..545bcdcc74f657876d0ecb32c425a9c78c784693 --- /dev/null +++ b/core/src/main/scala/spark/deploy/JobDescription.scala @@ -0,0 +1,12 @@ +package spark.deploy + +class JobDescription( + val name: String, + val memoryPerSlave: Int, + val cores: Int, + val resources: Seq[String], + val command: Command) + extends Serializable { + + val user = System.getProperty("user.name", "<unknown>") +} diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala new file mode 100644 index 0000000000000000000000000000000000000000..f8948049936326543b543846d47bff5d1dc44db6 --- /dev/null +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -0,0 +1,110 @@ +package spark.deploy.client + +import spark.deploy._ +import akka.actor._ +import akka.pattern.ask +import akka.util.duration._ +import spark.{SparkException, Logging} +import akka.remote.RemoteClientLifeCycleEvent +import akka.remote.RemoteClientShutdown +import spark.deploy.RegisterJob +import akka.remote.RemoteClientDisconnected +import akka.actor.Terminated +import akka.dispatch.Await + +/** + * The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description, + * and a listener for job events, and calls back the listener when various events occur. + */ +class Client( + actorSystem: ActorSystem, + masterUrl: String, + jobDescription: JobDescription, + listener: ClientListener) + extends Logging { + + val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r + + var actor: ActorRef = null + var jobId: String = null + + if (MASTER_REGEX.unapplySeq(masterUrl) == None) { + throw new SparkException("Invalid master URL: " + masterUrl) + } + + class ClientActor extends Actor with Logging { + var master: ActorRef = null + var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times + + override def preStart() { + val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get + logInfo("Connecting to master spark://" + masterHost + ":" + masterPort) + val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) + try { + master = context.actorFor(akkaUrl) + //master ! RegisterWorker(ip, port, cores, memory) + master ! RegisterJob(jobDescription) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing + } catch { + case e: Exception => + logError("Failed to connect to master", e) + markDisconnected() + context.stop(self) + } + } + + override def receive = { + case RegisteredJob(jobId_) => + jobId = jobId_ + listener.connected(jobId) + + case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) => + val fullId = jobId + "/" + id + logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores)) + listener.executorAdded(fullId, workerId, host, cores, memory) + + case ExecutorUpdated(id, state, message) => + val fullId = jobId + "/" + id + val messageText = if (message == null) "" else " (" + message + ")" + logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) + if (ExecutorState.isFinished(state)) { + listener.executorRemoved(fullId, message) + } + + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + logError("Connection to master failed; stopping client") + markDisconnected() + context.stop(self) + + case StopClient => + markDisconnected() + sender ! true + context.stop(self) + } + + /** + * Notify the listener that we disconnected, if we hadn't already done so before. + */ + def markDisconnected() { + if (!alreadyDisconnected) { + listener.disconnected() + alreadyDisconnected = true + } + } + } + + def start() { + // Just launch an actor; it will call back into the listener. + actor = actorSystem.actorOf(Props(new ClientActor)) + } + + def stop() { + if (actor != null) { + val timeout = 1.seconds + val future = actor.ask(StopClient)(timeout) + Await.result(future, timeout) + actor = null + } + } +} diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala new file mode 100644 index 0000000000000000000000000000000000000000..7d23baff32997f79b3ae65ff400514113ada827c --- /dev/null +++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala @@ -0,0 +1,18 @@ +package spark.deploy.client + +/** + * Callbacks invoked by deploy client when various events happen. There are currently four events: + * connecting to the cluster, disconnecting, being given an executor, and having an executor + * removed (either due to failure or due to revocation). + * + * Users of this API should *not* block inside the callback methods. + */ +trait ClientListener { + def connected(jobId: String): Unit + + def disconnected(): Unit + + def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit + + def executorRemoved(id: String, message: String): Unit +} diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala new file mode 100644 index 0000000000000000000000000000000000000000..e6d76f275188e2fc7cea6857e37b1c3d67f6b34b --- /dev/null +++ b/core/src/main/scala/spark/deploy/client/TestClient.scala @@ -0,0 +1,34 @@ +package spark.deploy.client + +import spark.util.AkkaUtils +import spark.{Logging, Utils} +import spark.deploy.{Command, JobDescription} + +object TestClient { + + class TestListener extends ClientListener with Logging { + def connected(id: String) { + logInfo("Connected to master, got job ID " + id) + } + + def disconnected() { + logInfo("Disconnected from master") + System.exit(0) + } + + def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {} + + def executorRemoved(id: String, message: String) {} + } + + def main(args: Array[String]) { + val url = args(0) + val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress(), 0) + val desc = new JobDescription("TestClient", 200, 1, Seq(), + Command("spark.deploy.client.TestExecutor", Seq(), Map())) + val listener = new TestListener + val client = new Client(actorSystem, url, desc, listener) + client.start() + actorSystem.awaitTermination() + } +} diff --git a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala new file mode 100644 index 0000000000000000000000000000000000000000..335e00958c6e40fc07cbfcb3c9f306be9a985996 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala @@ -0,0 +1,15 @@ +package spark.deploy.master + +import spark.deploy.ExecutorState + +class ExecutorInfo( + val id: Int, + val job: JobInfo, + val worker: WorkerInfo, + val cores: Int, + val memory: Int) { + + var state = ExecutorState.LAUNCHING + + def fullId: String = job.id + "/" + id +} diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala new file mode 100644 index 0000000000000000000000000000000000000000..e8502f0b8f37bc329e459591926836793f7196d9 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -0,0 +1,25 @@ +package spark.deploy.master + +import spark.deploy.JobDescription +import java.util.Date +import akka.actor.ActorRef +import scala.collection.mutable + +class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) { + var state = JobState.WAITING + var executors = new mutable.HashMap[Int, ExecutorInfo] + + var nextExecutorId = 0 + + def newExecutorId(): Int = { + val id = nextExecutorId + nextExecutorId += 1 + id + } + + def newExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = { + val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave) + executors(exec.id) = exec + exec + } +} diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala new file mode 100644 index 0000000000000000000000000000000000000000..3a69a37aca9c09366b30ccf61c4df5584c547a56 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/JobState.scala @@ -0,0 +1,5 @@ +package spark.deploy.master + +object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") { + val WAITING, RUNNING, FINISHED, FAILED = Value +} diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index cb0208e0b6f192eaf9bbb09134ee6c8165b47e8f..a21f156a5100e72c1f1dc2004a6ca685a753d335 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -1,47 +1,40 @@ package spark.deploy.master -import scala.collection.mutable.HashMap +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ import spark.{Logging, Utils} import spark.util.AkkaUtils import java.text.SimpleDateFormat import java.util.Date -import spark.deploy.{RegisteredSlave, RegisterSlave} -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} +import akka.remote.RemoteClientLifeCycleEvent +import spark.deploy._ import akka.remote.RemoteClientShutdown -import spark.deploy.RegisteredSlave import akka.remote.RemoteClientDisconnected +import spark.deploy.RegisterWorker +import spark.deploy.RegisterWorkerFailed import akka.actor.Terminated -import scala.Some -import spark.deploy.RegisterSlave - -class SlaveInfo( - val id: Int, - val host: String, - val port: Int, - val cores: Int, - val memory: Int, - val actor: ActorRef) { - var coresUsed = 0 - var memoryUsed = 0 - - def coresFree: Int = cores - coresUsed - - def memoryFree: Int = memory - memoryUsed -} class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { - val clusterId = newClusterId() - var nextSlaveId = 0 - var nextJobId = 0 - val slaves = new HashMap[Int, SlaveInfo] - val actorToSlave = new HashMap[ActorRef, SlaveInfo] - val addressToSlave = new HashMap[Address, SlaveInfo] + val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs + + var nextJobNumber = 0 + val workers = new HashSet[WorkerInfo] + val idToWorker = new HashMap[String, WorkerInfo] + val actorToWorker = new HashMap[ActorRef, WorkerInfo] + val addressToWorker = new HashMap[Address, WorkerInfo] + + val jobs = new HashSet[WorkerInfo] + val idToJob = new HashMap[String, JobInfo] + val actorToJob = new HashMap[ActorRef, JobInfo] + val addressToJob = new HashMap[Address, JobInfo] + + val waitingJobs = new ArrayBuffer[JobInfo] + val completedJobs = new ArrayBuffer[JobInfo] override def preStart() { logInfo("Starting Spark master at spark://" + ip + ":" + port) - logInfo("Cluster ID: " + clusterId) + // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) startWebUi() } @@ -58,50 +51,141 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { } override def receive = { - case RegisterSlave(host, slavePort, cores, memory) => { - logInfo("Registering slave %s:%d with %d cores, %s RAM".format( - host, slavePort, cores, Utils.memoryMegabytesToString(memory))) - val slave = addSlave(host, slavePort, cores, memory) + case RegisterWorker(id, host, workerPort, cores, memory) => { + logInfo("Registering worker %s:%d with %d cores, %s RAM".format( + host, workerPort, cores, Utils.memoryMegabytesToString(memory))) + if (idToWorker.contains(id)) { + sender ! RegisterWorkerFailed("Duplicate worker ID") + } else { + val worker = addWorker(id, host, workerPort, cores, memory) + context.watch(sender) // This doesn't work with remote actors but helps for testing + sender ! RegisteredWorker + schedule() + } + } + + case RegisterJob(description) => { + logInfo("Registering job " + description.name) + val job = addJob(description, sender) + logInfo("Registered job " + description.name + " with ID " + job.id) + waitingJobs += job context.watch(sender) // This doesn't work with remote actors but helps for testing - sender ! RegisteredSlave(clusterId, slave.id) + sender ! RegisteredJob(job.id) + schedule() + } + + case ExecutorStateChanged(jobId, execId, state, message) => { + val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId)) + execOption match { + case Some(exec) => { + exec.state = state + exec.job.actor ! ExecutorUpdated(execId, state, message) + if (ExecutorState.isFinished(state)) { + // Remove this executor from the worker and job + logInfo("Removing executor " + exec.fullId) + idToJob(jobId).executors -= exec.id + exec.worker.removeExecutor(exec) + } + } + case None => + logWarning("Got status update for unknown executor " + jobId + "/" + execId) + } + } + + case Terminated(actor) => { + // The disconnected actor could've been either a worker or a job; remove whichever of + // those we have an entry for in the corresponding actor hashmap + actorToWorker.get(actor).foreach(removeWorker) + actorToJob.get(actor).foreach(removeJob) } - case RemoteClientDisconnected(transport, address) => - logInfo("Remote client disconnected: " + address) - addressToSlave.get(address).foreach(s => removeSlave(s)) // Remove slave, if any, at address + case RemoteClientDisconnected(transport, address) => { + // The disconnected client could've been either a worker or a job; remove whichever it was + addressToWorker.get(address).foreach(removeWorker) + addressToJob.get(address).foreach(removeJob) + } + + case RemoteClientShutdown(transport, address) => { + // The disconnected client could've been either a worker or a job; remove whichever it was + addressToWorker.get(address).foreach(removeWorker) + addressToJob.get(address).foreach(removeJob) + } + } + + /** + * Schedule the currently available resources among waiting jobs. This method will be called + * every time a new job joins or resource availability changes. + */ + def schedule() { + // Right now this is a very simple FIFO with backfilling. We keep looking through the jobs + // in order of submission time and launching the first one that fits in the cluster. + // It's also not very efficient in terms of algorithmic complexity. + for (job <- waitingJobs) { + // Figure out how many cores the job could use on the whole cluster + val jobMemory = job.desc.memoryPerSlave + val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum + if (usableCores >= job.desc.cores) { + // We can launch it! Let's just partition the workers into executors for this job. + // TODO: Probably want to spread stuff out across nodes more. + var coresLeft = job.desc.cores + for (worker <- workers if worker.memoryFree >= jobMemory && coresLeft > 0) { + val coresToUse = math.min(worker.coresFree, coresLeft) + val exec = job.newExecutor(worker, coresToUse) + launchExecutor(worker, exec) + coresLeft -= coresToUse + } + } + } + } - case RemoteClientShutdown(transport, address) => - logInfo("Remote client shutdown: " + address) - addressToSlave.get(address).foreach(s => removeSlave(s)) // Remove slave, if any, at address + def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { + logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) + worker.addExecutor(exec) + worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc) + exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory) + } - case Terminated(actor) => - logInfo("Slave disconnected: " + actor) - actorToSlave.get(actor).foreach(s => removeSlave(s)) // Remove slave, if any, at actor + def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int): WorkerInfo = { + val worker = new WorkerInfo(id, host, port, cores, memory, sender) + idToWorker(worker.id) = worker + actorToWorker(sender) = worker + addressToWorker(sender.path.address) = worker + return worker } - def addSlave(host: String, slavePort: Int, cores: Int, memory: Int): SlaveInfo = { - val slave = new SlaveInfo(newSlaveId(), host, slavePort, cores, memory, sender) - slaves(slave.id) = slave - actorToSlave(sender) = slave - addressToSlave(sender.path.address) = slave - return slave + def removeWorker(worker: WorkerInfo) { + logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port) + idToWorker -= worker.id + actorToWorker -= worker.actor + addressToWorker -= worker.actor.path.address } - def removeSlave(slave: SlaveInfo) { - logInfo("Removing slave " + slave.id + " on " + slave.host + ":" + slave.port) - slaves -= slave.id - actorToSlave -= slave.actor - addressToSlave -= slave.actor.path.address + def addJob(desc: JobDescription, actor: ActorRef): JobInfo = { + val date = new Date + val job = new JobInfo(newJobId(date), desc, date, actor) + idToJob(job.id) = job + actorToJob(sender) = job + addressToJob(sender.path.address) = job + return job } - def newClusterId(): String = { - val date = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()) - "%s-%04d".format(date, (math.random * 10000).toInt) + def removeJob(job: JobInfo) { + logInfo("Removing job " + job.id) + idToJob -= job.id + actorToJob -= job.actor + addressToWorker -= job.actor.path.address + completedJobs += job // Remember it in our history + for (exec <- job.executors.values) { + + } + schedule() } - def newSlaveId(): Int = { - nextSlaveId += 1 - nextSlaveId - 1 + /** Generate a new job ID given a job's submission date */ + def newJobId(submitDate: Date): String = { + val jobId = "job-%s-%4d".format(DATE_FORMAT.format(submitDate), nextJobNumber) + nextJobNumber += 1 + jobId } } diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala new file mode 100644 index 0000000000000000000000000000000000000000..af0be108ea2b4fb05c061d744dd7f080edadc675 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala @@ -0,0 +1,35 @@ +package spark.deploy.master + +import akka.actor.ActorRef +import scala.collection.mutable + +class WorkerInfo( + val id: String, + val host: String, + val port: Int, + val cores: Int, + val memory: Int, + val actor: ActorRef) { + + var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info + + var coresUsed = 0 + var memoryUsed = 0 + + def coresFree: Int = cores - coresUsed + def memoryFree: Int = memory - memoryUsed + + def addExecutor(exec: ExecutorInfo) { + executors(exec.fullId) = exec + coresUsed += exec.cores + memoryUsed += exec.memory + } + + def removeExecutor(exec: ExecutorInfo) { + if (executors.contains(exec.fullId)) { + executors -= exec.fullId + coresUsed -= exec.cores + memoryUsed -= exec.memory + } + } +} diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 22b070658d8896dedd9e55d3a7d3b4f4cba8af9e..1f5854011ffc215c46b54a1c6802484bd3f95d29 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -2,22 +2,22 @@ package spark.deploy.worker import akka.actor.{ActorRef, Terminated, Props, Actor} -import akka.pattern.ask -import akka.util.duration._ -import spark.{SparkException, Logging, Utils} -import spark.util.{IntParam, AkkaUtils} -import spark.deploy.{RegisterSlave, RegisteredSlave} -import akka.dispatch.Await +import spark.{Logging, Utils} +import spark.util.AkkaUtils +import spark.deploy.{RegisterWorkerFailed, RegisterWorker, RegisteredWorker} import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import java.text.SimpleDateFormat +import java.util.Date class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrl: String) extends Actor with Logging { + val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r var master: ActorRef = null - var clusterId: String = null - var slaveId: Int = 0 + + val workerId = generateWorkerId() var coresUsed = 0 var memoryUsed = 0 @@ -34,19 +34,20 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas def connectToMaster() { masterUrl match { - case MASTER_REGEX(masterHost, masterPort) => + case MASTER_REGEX(masterHost, masterPort) => { logInfo("Connecting to master spark://" + masterHost + ":" + masterPort) val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) try { master = context.actorFor(akkaUrl) - master ! RegisterSlave(ip, port, cores, memory) + master ! RegisterWorker(workerId, ip, port, cores, memory) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing + context.watch(master) // Doesn't work with remote actors, but useful for testing } catch { case e: Exception => logError("Failed to connect to master", e) System.exit(1) } + } case _ => logError("Invalid master URL: " + masterUrl) @@ -66,26 +67,27 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas } override def receive = { - case RegisteredSlave(clusterId_, slaveId_) => - this.clusterId = clusterId_ - this.slaveId = slaveId_ - logInfo("Registered with master, cluster ID = " + clusterId + ", slave ID = " + slaveId) - - case RemoteClientDisconnected(_, _) => - masterDisconnected() + case RegisteredWorker => + logInfo("Successfully registered with master") - case RemoteClientShutdown(_, _) => - masterDisconnected() + case RegisterWorkerFailed(message) => + logError("Worker registration failed: " + message) + System.exit(1) - case Terminated(_) => + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => masterDisconnected() } def masterDisconnected() { - // Not sure what to do here exactly, so just shut down for now. + // TODO: It would be nice to try to reconnect to the master, but just shut down for now. + // (Note that if reconnecting we would also need to assign IDs differently.) logError("Connection to master failed! Shutting down.") System.exit(1) } + + def generateWorkerId(): String = { + "worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), ip, port) + } } object Worker { diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index 3cf12ebe0e65cd9c784ab1c513b4238ecb9ebda8..57d212e4cab04a19f0eb3edee654e4e89d7236bd 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -53,7 +53,7 @@ object AkkaUtils { val server = actorSystem.actorOf( Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer") actorSystem.registerOnTermination { ioWorker.stop() } - val timeout = 1.seconds + val timeout = 3.seconds val future = server.ask(HttpServer.Bind(ip, port))(timeout) try { Await.result(future, timeout) match {