Skip to content
Snippets Groups Projects
Commit a6eb9fda authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Detect connection and disconnection of slaves

parent 408b5a13
No related branches found
No related tags found
No related merge requests found
...@@ -2,12 +2,19 @@ package spark.deploy.master ...@@ -2,12 +2,19 @@ package spark.deploy.master
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import akka.actor.{Terminated, ActorRef, Props, Actor} import akka.actor._
import spark.{Logging, Utils} import spark.{Logging, Utils}
import spark.util.AkkaUtils import spark.util.AkkaUtils
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.Date import java.util.Date
import spark.deploy.{RegisteredSlave, RegisterSlave} import spark.deploy.{RegisteredSlave, RegisterSlave}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.remote.RemoteClientShutdown
import spark.deploy.RegisteredSlave
import akka.remote.RemoteClientDisconnected
import akka.actor.Terminated
import scala.Some
import spark.deploy.RegisterSlave
class SlaveInfo( class SlaveInfo(
val id: Int, val id: Int,
...@@ -30,10 +37,12 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { ...@@ -30,10 +37,12 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
var nextJobId = 0 var nextJobId = 0
val slaves = new HashMap[Int, SlaveInfo] val slaves = new HashMap[Int, SlaveInfo]
val actorToSlave = new HashMap[ActorRef, SlaveInfo] val actorToSlave = new HashMap[ActorRef, SlaveInfo]
val addressToSlave = new HashMap[Address, SlaveInfo]
override def preStart() { override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port) logInfo("Starting Spark master at spark://" + ip + ":" + port)
logInfo("Cluster ID: " + clusterId) logInfo("Cluster ID: " + clusterId)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi() startWebUi()
} }
...@@ -52,24 +61,37 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { ...@@ -52,24 +61,37 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
case RegisterSlave(host, slavePort, cores, memory) => { case RegisterSlave(host, slavePort, cores, memory) => {
logInfo("Registering slave %s:%d with %d cores, %s RAM".format( logInfo("Registering slave %s:%d with %d cores, %s RAM".format(
host, slavePort, cores, Utils.memoryMegabytesToString(memory))) host, slavePort, cores, Utils.memoryMegabytesToString(memory)))
val id = newSlaveId() val slave = addSlave(host, slavePort, cores, memory)
slaves(id) = new SlaveInfo(id, host, slavePort, cores, memory, sender) context.watch(sender) // This doesn't work with remote actors but helps for testing
actorToSlave(sender) = slaves(id) sender ! RegisteredSlave(clusterId, slave.id)
context.watch(sender)
sender ! RegisteredSlave(clusterId, id)
} }
case Terminated(actor) => { case RemoteClientDisconnected(transport, address) =>
logInfo("Remote client disconnected: " + address)
addressToSlave.get(address).foreach(s => removeSlave(s)) // Remove slave, if any, at address
case RemoteClientShutdown(transport, address) =>
logInfo("Remote client shutdown: " + address)
addressToSlave.get(address).foreach(s => removeSlave(s)) // Remove slave, if any, at address
case Terminated(actor) =>
logInfo("Slave disconnected: " + actor) logInfo("Slave disconnected: " + actor)
actorToSlave.get(actor) match { actorToSlave.get(actor).foreach(s => removeSlave(s)) // Remove slave, if any, at actor
case Some(slave) => }
logInfo("Removing slave " + slave.id)
slaves -= slave.id def addSlave(host: String, slavePort: Int, cores: Int, memory: Int): SlaveInfo = {
actorToSlave -= actor val slave = new SlaveInfo(newSlaveId(), host, slavePort, cores, memory, sender)
case None => slaves(slave.id) = slave
logError("Did not have any slave registered for " + actor) actorToSlave(sender) = slave
} addressToSlave(sender.path.address) = slave
} return slave
}
def removeSlave(slave: SlaveInfo) {
logInfo("Removing slave " + slave.id + " on " + slave.host + ":" + slave.port)
slaves -= slave.id
actorToSlave -= slave.actor
addressToSlave -= slave.actor.path.address
} }
def newClusterId(): String = { def newClusterId(): String = {
......
...@@ -41,11 +41,11 @@ class MasterArguments(args: Array[String]) { ...@@ -41,11 +41,11 @@ class MasterArguments(args: Array[String]) {
def printUsageAndExit(exitCode: Int) { def printUsageAndExit(exitCode: Int) {
System.err.println( System.err.println(
"Usage: spark-master [options]\n" + "Usage: spark-master [options]\n" +
"\n" + "\n" +
"Options:\n" + "Options:\n" +
" -i IP, --ip IP IP address or DNS name to listen on\n" + " -i IP, --ip IP IP address or DNS name to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" + " -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port PORT Port for web UI (default: 8080)") " --webui-port PORT Port for web UI (default: 8080)")
System.exit(exitCode) System.exit(exitCode)
} }
} }
\ No newline at end of file
package spark.deploy.worker package spark.deploy.worker
import scala.collection.mutable.HashMap
import akka.actor.{Terminated, ActorRef, Props, Actor} import akka.actor.{ActorRef, Terminated, Props, Actor}
import spark.{Logging, Utils} import akka.pattern.ask
import spark.util.AkkaUtils import akka.util.duration._
import java.text.SimpleDateFormat import spark.{SparkException, Logging, Utils}
import java.util.Date import spark.util.{IntParam, AkkaUtils}
import spark.deploy.{RegisteredSlave, RegisterSlave} import spark.deploy.{RegisterSlave, RegisteredSlave}
import akka.dispatch.Await
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int) class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrl: String)
extends Actor with Logging { extends Actor with Logging {
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
var master: ActorRef = null
var clusterId: String = null
var slaveId: Int = 0
var coresUsed = 0 var coresUsed = 0
var memoryUsed = 0 var memoryUsed = 0
...@@ -21,9 +28,32 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int) ...@@ -21,9 +28,32 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int)
override def preStart() { override def preStart() {
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
ip, port, cores, Utils.memoryMegabytesToString(memory))) ip, port, cores, Utils.memoryMegabytesToString(memory)))
connectToMaster()
startWebUi() startWebUi()
} }
def connectToMaster() {
masterUrl match {
case MASTER_REGEX(masterHost, masterPort) =>
logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
master ! RegisterSlave(ip, port, cores, memory)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
System.exit(1)
}
case _ =>
logError("Invalid master URL: " + masterUrl)
System.exit(1)
}
}
def startWebUi() { def startWebUi() {
val webUi = new WorkerWebUI(context.system, self) val webUi = new WorkerWebUI(context.system, self)
try { try {
...@@ -36,13 +66,25 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int) ...@@ -36,13 +66,25 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int)
} }
override def receive = { override def receive = {
case RegisteredSlave(clusterId, slaveId) => { case RegisteredSlave(clusterId_, slaveId_) =>
logInfo("Registered with cluster ID " + clusterId + ", slave ID " + slaveId) this.clusterId = clusterId_
} this.slaveId = slaveId_
logInfo("Registered with master, cluster ID = " + clusterId + ", slave ID = " + slaveId)
case Terminated(actor) => { case RemoteClientDisconnected(_, _) =>
logError("Master disconnected!") masterDisconnected()
}
case RemoteClientShutdown(_, _) =>
masterDisconnected()
case Terminated(_) =>
masterDisconnected()
}
def masterDisconnected() {
// Not sure what to do here exactly, so just shut down for now.
logError("Connection to master failed! Shutting down.")
System.exit(1)
} }
} }
...@@ -51,7 +93,7 @@ object Worker { ...@@ -51,7 +93,7 @@ object Worker {
val args = new WorkerArguments(argStrings) val args = new WorkerArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port) val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
val actor = actorSystem.actorOf( val actor = actorSystem.actorOf(
Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory)), Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory, args.master)),
name = "Worker") name = "Worker")
actorSystem.awaitTermination() actorSystem.awaitTermination()
} }
......
...@@ -14,6 +14,7 @@ class WorkerArguments(args: Array[String]) { ...@@ -14,6 +14,7 @@ class WorkerArguments(args: Array[String]) {
var webUiPort = 8081 var webUiPort = 8081
var cores = inferDefaultCores() var cores = inferDefaultCores()
var memory = inferDefaultMemory() var memory = inferDefaultMemory()
var master: String = null
parse(args.toList) parse(args.toList)
...@@ -41,7 +42,17 @@ class WorkerArguments(args: Array[String]) { ...@@ -41,7 +42,17 @@ class WorkerArguments(args: Array[String]) {
case ("--help" | "-h") :: tail => case ("--help" | "-h") :: tail =>
printUsageAndExit(0) printUsageAndExit(0)
case Nil => {} case value :: tail =>
if (master != null) { // Two positional arguments were given
printUsageAndExit(1)
}
master = value
parse(tail)
case Nil =>
if (master == null) { // No positional argument was given
printUsageAndExit(1)
}
case _ => case _ =>
printUsageAndExit(1) printUsageAndExit(1)
...@@ -52,14 +63,16 @@ class WorkerArguments(args: Array[String]) { ...@@ -52,14 +63,16 @@ class WorkerArguments(args: Array[String]) {
*/ */
def printUsageAndExit(exitCode: Int) { def printUsageAndExit(exitCode: Int) {
System.err.println( System.err.println(
"Usage: spark-worker [options]\n" + "Usage: spark-worker [options] <master>\n" +
"\n" + "\n" +
"Options:\n" + "Master must be a URL of the form spark://hostname:port\n" +
" -c CORES, --cores CORES Number of cores to use\n" + "\n" +
" -m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)\n" + "Options:\n" +
" -i IP, --ip IP IP address or DNS name to listen on\n" + " -c CORES, --cores CORES Number of cores to use\n" +
" -p PORT, --port PORT Port to listen on (default: random)\n" + " -m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)\n" +
" --webui-port PORT Port for web UI (default: 8081)") " -i IP, --ip IP IP address or DNS name to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: random)\n" +
" --webui-port PORT Port for web UI (default: 8081)")
System.exit(exitCode) System.exit(exitCode)
} }
......
...@@ -30,6 +30,7 @@ object AkkaUtils { ...@@ -30,6 +30,7 @@ object AkkaUtils {
akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
akka.remote.netty.hostname = "%s" akka.remote.netty.hostname = "%s"
akka.remote.netty.port = %d akka.remote.netty.port = %d
akka.remote.netty.connection-timeout = 1s
""".format(host, port)) """.format(host, port))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
...@@ -39,8 +40,6 @@ object AkkaUtils { ...@@ -39,8 +40,6 @@ object AkkaUtils {
val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider
val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
return (actorSystem, boundPort) return (actorSystem, boundPort)
return (null, 0)
} }
/** /**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment