From 28e0cb9f312b7fb1b0236fd15ba0dd2f423e826d Mon Sep 17 00:00:00 2001
From: Stephen Haberman <stephen@exigencecorp.com>
Date: Sat, 2 Feb 2013 01:11:37 -0600
Subject: [PATCH] Fix createActorSystem not actually using the systemName
 parameter.

This meant all system names were "spark", which worked, but didn't
lead to the most intuitive log output.

This fixes createActorSystem to use the passed system name, and
refactors Master/Worker to encapsulate their system/actor names
instead of having the clients guess at them.

Note that the driver system name, "spark", is left as is, and is
still repeated a few times, but that seems like a separate issue.
---
 .../spark/deploy/LocalSparkCluster.scala      | 38 +++++-------
 .../scala/spark/deploy/client/Client.scala    | 13 +----
 .../scala/spark/deploy/master/Master.scala    | 24 +++++++-
 .../scala/spark/deploy/worker/Worker.scala    | 58 +++++++++----------
 .../spark/storage/BlockManagerMaster.scala    |  2 -
 .../src/main/scala/spark/util/AkkaUtils.scala |  6 +-
 6 files changed, 68 insertions(+), 73 deletions(-)

diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
index 2836574ecb..22319a96ca 100644
--- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala
@@ -18,35 +18,23 @@ import scala.collection.mutable.ArrayBuffer
 private[spark]
 class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging {
   
-  val localIpAddress = Utils.localIpAddress
+  private val localIpAddress = Utils.localIpAddress
+  private val masterActorSystems = ArrayBuffer[ActorSystem]()
+  private val workerActorSystems = ArrayBuffer[ActorSystem]()
   
-  var masterActor : ActorRef = _
-  var masterActorSystem : ActorSystem = _
-  var masterPort : Int = _
-  var masterUrl : String = _
-  
-  val workerActorSystems = ArrayBuffer[ActorSystem]()
-  val workerActors = ArrayBuffer[ActorRef]()
-  
-  def start() : String = {
+  def start(): String = {
     logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
 
     /* Start the Master */
-    val (actorSystem, masterPort) = AkkaUtils.createActorSystem("sparkMaster", localIpAddress, 0)
-    masterActorSystem = actorSystem
-    masterUrl = "spark://" + localIpAddress + ":" + masterPort
-    masterActor = masterActorSystem.actorOf(
-      Props(new Master(localIpAddress, masterPort, 0)), name = "Master")
+    val (masterSystem, masterPort) = Master.startSystemAndActor(localIpAddress, 0, 0)
+    masterActorSystems += masterSystem
+    val masterUrl = "spark://" + localIpAddress + ":" + masterPort
 
-    /* Start the Slaves */
+    /* Start the Workers */
     for (workerNum <- 1 to numWorkers) {
-      val (actorSystem, boundPort) = 
-        AkkaUtils.createActorSystem("sparkWorker" + workerNum, localIpAddress, 0)
-      workerActorSystems += actorSystem
-      val actor = actorSystem.actorOf(
-        Props(new Worker(localIpAddress, boundPort, 0, coresPerWorker, memoryPerWorker, masterUrl)),
-              name = "Worker")
-      workerActors += actor
+      val (workerSystem, _) = Worker.startSystemAndActor(localIpAddress, 0, 0, coresPerWorker,
+        memoryPerWorker, masterUrl, null, Some(workerNum))
+      workerActorSystems += workerSystem
     }
 
     return masterUrl
@@ -57,7 +45,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
     // Stop the workers before the master so they don't get upset that it disconnected
     workerActorSystems.foreach(_.shutdown())
     workerActorSystems.foreach(_.awaitTermination())
-    masterActorSystem.shutdown()
-    masterActorSystem.awaitTermination()
+    masterActorSystems.foreach(_.shutdown())
+    masterActorSystems.foreach(_.awaitTermination())
   }
 }
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index 90fe9508cd..a63eee1233 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -9,6 +9,7 @@ import spark.{SparkException, Logging}
 import akka.remote.RemoteClientLifeCycleEvent
 import akka.remote.RemoteClientShutdown
 import spark.deploy.RegisterJob
+import spark.deploy.master.Master
 import akka.remote.RemoteClientDisconnected
 import akka.actor.Terminated
 import akka.dispatch.Await
@@ -24,26 +25,18 @@ private[spark] class Client(
     listener: ClientListener)
   extends Logging {
 
-  val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
-
   var actor: ActorRef = null
   var jobId: String = null
 
-  if (MASTER_REGEX.unapplySeq(masterUrl) == None) {
-    throw new SparkException("Invalid master URL: " + masterUrl)
-  }
-
   class ClientActor extends Actor with Logging {
     var master: ActorRef = null
     var masterAddress: Address = null
     var alreadyDisconnected = false  // To avoid calling listener.disconnected() multiple times
 
     override def preStart() {
-      val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get
-      logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
-      val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
+      logInfo("Connecting to master " + masterUrl)
       try {
-        master = context.actorFor(akkaUrl)
+        master = context.actorFor(Master.toAkkaUrl(masterUrl))
         masterAddress = master.path.address
         master ! RegisterJob(jobDescription)
         context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index c618e87cdd..92e7914b1b 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -262,11 +262,29 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
 }
 
 private[spark] object Master {
+  private val systemName = "sparkMaster"
+  private val actorName = "Master"
+  private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
+
   def main(argStrings: Array[String]) {
     val args = new MasterArguments(argStrings)
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
-    val actor = actorSystem.actorOf(
-      Props(new Master(args.ip, boundPort, args.webUiPort)), name = "Master")
+    val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort)
     actorSystem.awaitTermination()
   }
+
+  /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
+  def toAkkaUrl(sparkUrl: String): String = {
+    sparkUrl match {
+      case sparkUrlRegex(host, port) =>
+        "akka://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
+      case _ =>
+        throw new SparkException("Invalid master URL: " + sparkUrl)
+    }
+  }
+
+  def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+    val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
+    (actorSystem, boundPort)
+  }
 }
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 8b41620d98..2219dd6262 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -1,7 +1,7 @@
 package spark.deploy.worker
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
-import akka.actor.{ActorRef, Props, Actor}
+import akka.actor.{ActorRef, Props, Actor, ActorSystem}
 import spark.{Logging, Utils}
 import spark.util.AkkaUtils
 import spark.deploy._
@@ -13,6 +13,7 @@ import akka.remote.RemoteClientDisconnected
 import spark.deploy.RegisterWorker
 import spark.deploy.LaunchExecutor
 import spark.deploy.RegisterWorkerFailed
+import spark.deploy.master.Master
 import akka.actor.Terminated
 import java.io.File
 
@@ -27,7 +28,6 @@ private[spark] class Worker(
   extends Actor with Logging {
 
   val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss")  // For worker and executor IDs
-  val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
 
   var master: ActorRef = null
   var masterWebUiUrl : String = ""
@@ -48,11 +48,7 @@ private[spark] class Worker(
   def memoryFree: Int = memory - memoryUsed
 
   def createWorkDir() {
-    workDir = if (workDirPath != null) {
-      new File(workDirPath)
-    } else {
-      new File(sparkHome, "work")
-    }
+    workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
     try {
       if (!workDir.exists() && !workDir.mkdirs()) {
         logError("Failed to create work directory " + workDir)
@@ -68,8 +64,7 @@ private[spark] class Worker(
   override def preStart() {
     logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
       ip, port, cores, Utils.memoryMegabytesToString(memory)))
-    val envVar = System.getenv("SPARK_HOME")
-    sparkHome = new File(if (envVar == null) "." else envVar)
+    sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     connectToMaster()
@@ -77,24 +72,15 @@ private[spark] class Worker(
   }
 
   def connectToMaster() {
-    masterUrl match {
-      case MASTER_REGEX(masterHost, masterPort) => {
-        logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
-        val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
-        try {
-          master = context.actorFor(akkaUrl)
-          master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
-          context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
-          context.watch(master) // Doesn't work with remote actors, but useful for testing
-        } catch {
-          case e: Exception =>
-            logError("Failed to connect to master", e)
-            System.exit(1)
-        }
-      }
-
-      case _ =>
-        logError("Invalid master URL: " + masterUrl)
+    logInfo("Connecting to master " + masterUrl)
+    try {
+      master = context.actorFor(Master.toAkkaUrl(masterUrl))
+      master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort, publicAddress)
+      context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+      context.watch(master) // Doesn't work with remote actors, but useful for testing
+    } catch {
+      case e: Exception =>
+        logError("Failed to connect to master", e)
         System.exit(1)
     }
   }
@@ -183,11 +169,19 @@ private[spark] class Worker(
 private[spark] object Worker {
   def main(argStrings: Array[String]) {
     val args = new WorkerArguments(argStrings)
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", args.ip, args.port)
-    val actor = actorSystem.actorOf(
-      Props(new Worker(args.ip, boundPort, args.webUiPort, args.cores, args.memory,
-        args.master, args.workDir)),
-      name = "Worker")
+    val (actorSystem, _) = startSystemAndActor(args.ip, args.port, args.webUiPort, args.cores,
+      args.memory, args.master, args.workDir)
     actorSystem.awaitTermination()
   }
+
+  def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
+    masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
+    // The LocalSparkCluster runs multiple local sparkWorkerX actor systems
+    val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+    val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
+      masterUrl, workDir)), name = "Worker")
+    (actorSystem, boundPort)
+  }
+
 }
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 36398095a2..7be6b9fa87 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -27,8 +27,6 @@ private[spark] class BlockManagerMaster(
   val AKKA_RETRY_INTERVAL_MS: Int = System.getProperty("spark.akka.retry.wait", "3000").toInt
 
   val DRIVER_AKKA_ACTOR_NAME = "BlockMasterManager"
-  val SLAVE_AKKA_ACTOR_NAME = "BlockSlaveManager"
-  val DEFAULT_MANAGER_IP: String = Utils.localHostName()
 
   val timeout = 10.seconds
   var driverActor: ActorRef = {
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index e0fdeffbc4..3a3626e8a0 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -18,9 +18,13 @@ import java.util.concurrent.TimeoutException
  * Various utility classes for working with Akka.
  */
 private[spark] object AkkaUtils {
+
   /**
    * Creates an ActorSystem ready for remoting, with various Spark features. Returns both the
    * ActorSystem itself and its port (which is hard to get from Akka).
+   *
+   * Note: the `name` parameter is important, as even if a client sends a message to right
+   * host + port, if the system name is incorrect, Akka will drop the message.
    */
   def createActorSystem(name: String, host: String, port: Int): (ActorSystem, Int) = {
     val akkaThreads = System.getProperty("spark.akka.threads", "4").toInt
@@ -41,7 +45,7 @@ private[spark] object AkkaUtils {
       akka.actor.default-dispatcher.throughput = %d
       """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize))
 
-    val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
+    val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader)
 
     // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
     // hack because Akka doesn't let you figure out the port through the public API yet.
-- 
GitLab