Skip to content
Snippets Groups Projects
Commit d5daaab3 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #442 from stephenh/fixsystemnames

Fix createActorSystem not actually using the systemName parameter.
parents 9163c370 28e0cb9f
No related branches found
No related tags found
No related merge requests found
......@@ -18,35 +18,23 @@ import scala.collection.mutable.ArrayBuffer
private[spark]
class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
val localIpAddress = Utils.localIpAddress
private val localIpAddress = Utils.localIpAddress
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
var masterActor : ActorRef = _
var masterActorSystem : ActorSystem = _
var masterPort : Int = _
var masterUrl : String = _
val workerActorSystems = ArrayBuffer[ActorSystem]()
val workerActors = ArrayBuffer[ActorRef]()
def start() : String = {
def start(): String = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
masterActorSystem = actorSystem
masterUrl = "spark://" + localIpAddress + ":" + masterPort
masterActor = masterActorSystem.actorOf(
Props(new Master(localIpAddress, masterPort, 0)), name = "Master")
val (masterSystem, masterPort) = Master.startSystemAndActor(localIpAddress, 0, 0)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localIpAddress + ":" + masterPort
/* Start the Slaves */
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0)
workerActorSystems += actorSystem
val actor = actorSystem.actorOf(
Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
name = "Worker")
workerActors += actor
val (workerSystem, _) = Worker.startSystemAndActor(localIpAddress, 0, 0, coresPerWorker,
memoryPerWorker, masterUrl, null, Some(workerNum))
workerActorSystems += workerSystem
}
return masterUrl
......@@ -57,7 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
// Stop the workers before the master so they don't get upset that it disconnected
workerActorSystems.foreach(_.shutdown())
workerActorSystems.foreach(_.awaitTermination())
masterActorSystem.shutdown()
masterActorSystem.awaitTermination()
masterActorSystems.foreach(_.shutdown())
masterActorSystems.foreach(_.awaitTermination())
}
}
......@@ -9,6 +9,7 @@ import spark.{SparkException, Logging}
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
import spark.deploy.RegisterJob
import spark.deploy.master.Master
import akka.remote.RemoteClientDisconnected
import akka.actor.Terminated
import akka.dispatch.Await
......@@ -24,26 +25,18 @@ private[spark] class Client(
listener: ClientListener)
extends Logging {
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
var actor: ActorRef = null
var jobId: String = null
if (MASTER_REGEX.unapplySeq(masterUrl) == None) {
throw new SparkException("Invalid master URL: " + masterUrl)
}
class ClientActor extends Actor with Logging {
var master: ActorRef = null
var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
override def preStart() {
val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get
logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
logInfo("Connecting to master " + masterUrl)
try {
master = context.actorFor(akkaUrl)
master = context.actorFor(Master.toAkkaUrl(masterUrl))
masterAddress = master.path.address
master ! RegisterJob(jobDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
......
......@@ -262,11 +262,29 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
private[spark] object Master {
private val systemName = "sparkMaster"
private val actorName = "Master"
private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
val actor = actorSystem.actorOf(
Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master")
val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort)
actorSystem.awaitTermination()
}
/** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
def toAkkaUrl(sparkUrl: String): String = {
sparkUrl match {
case sparkUrlRegex(host, port) =>
"akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
case _ =>
throw new SparkException("Invalid master URL: " + sparkUrl)
}
}
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
(actorSystem, boundPort)
}
}
package spark.deploy.worker
import scala.collection.mutable.{ArrayBuffer, HashMap}
import akka.actor.{ActorRef, Props, Actor}
import akka.actor.{ActorRef, Props, Actor, ActorSystem}
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import spark.deploy._
......@@ -13,6 +13,7 @@ import akka.remote.RemoteClientDisconnected
import spark.deploy.RegisterWorker
import spark.deploy.LaunchExecutor
import spark.deploy.RegisterWorkerFailed
import spark.deploy.master.Master
import akka.actor.Terminated
import java.io.File
......@@ -27,7 +28,6 @@ private[spark] class Worker(
extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
var master: ActorRef = null
var masterWebUiUrl : String = ""
......@@ -48,11 +48,7 @@ private[spark] class Worker(
def memoryFree: Int = memory - memoryUsed
def createWorkDir() {
workDir = if (workDirPath != null) {
new File(workDirPath)
} else {
new File(sparkHome, "work")
}
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
try {
if (!workDir.exists() && !workDir.mkdirs()) {
logError("Failed to create work directory " + workDir)
......@@ -68,8 +64,7 @@ private[spark] class Worker(
override def preStart() {
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
ip, port, cores, Utils.memoryMegabytesToString(memory)))
val envVar = System.getenv("SPARK_HOME")
sparkHome = new File(if (envVar == null) "." else envVar)
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
connectToMaster()
......@@ -77,24 +72,15 @@ private[spark] class Worker(
}
def connectToMaster() {
masterUrl match {
case MASTER_REGEX(masterHost, masterPort) => {
logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
System.exit(1)
}
}
case _ =>
logError("Invalid master URL: " + masterUrl)
logInfo("Connecting to master " + masterUrl)
try {
master = context.actorFor(Master.toAkkaUrl(masterUrl))
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
System.exit(1)
}
}
......@@ -183,11 +169,19 @@ private[spark] class Worker(
private[spark] object Worker {
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
val actor = actorSystem.actorOf(
Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory,
args.master, args.workDir)),
name = "Worker")
val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort, args.cores,
args.memory, args.master, args.workDir)
actorSystem.awaitTermination()
}
def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
masterUrl, workDir)), name = "Worker")
(actorSystem, boundPort)
}
}
......@@ -27,8 +27,6 @@ private[spark] class BlockManagerMaster(
val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager"
val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
val DEFAULT_MANAGER_IP: String = Utils.localHostName()
val timeout = 10.seconds
var driverActor: ActorRef = {
......
......@@ -18,9 +18,13 @@ import java.util.concurrent.TimeoutException
* Various utility classes for working with Akka.
*/
private[spark] object AkkaUtils {
/**
* Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
* ActorSystem itself and its port (which is hard to get from Akka).
*
* Note: the `name` parameter is important, as even if a client sends a message to right
* host + port, if the system name is incorrect, Akka will drop the message.
*/
def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
......@@ -42,7 +46,7 @@ private[spark] object AkkaUtils {
akka.actor.default-dispatcher.throughput = %d
""".format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize))
val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader)
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
// hack because Akka doesn't let you figure out the port through the public API yet.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment