From 3a58efa5a5da9a9a83bdaf0d4e5d4df6223e6a22 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Fri, 29 Jun 2012 16:01:36 -0700
Subject: [PATCH] Allow binding to a free port and change Akka logging to use
 SLF4J. Also fixes various bugs in the previous code when running on Mesos.

---
 core/src/main/scala/spark/CacheTracker.scala  |  2 +-
 core/src/main/scala/spark/Executor.scala      |  2 +-
 .../main/scala/spark/MapOutputTracker.scala   |  2 +-
 core/src/main/scala/spark/SparkContext.scala  |  8 +++-
 core/src/main/scala/spark/SparkEnv.scala      | 38 +++++++++++++------
 .../spark/storage/BlockManagerMaster.scala    |  5 ++-
 6 files changed, 38 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 65e3803144..19870408d3 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -116,7 +116,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
     logInfo("Registered CacheTrackerActor actor")
     actor
   } else {
-    val url = "akka://spark@%s:%s/%s".format(ip, port, actorName)
+    val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
     actorSystem.actorFor(url)
   }
 
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index af9eb9c878..3d70cf1737 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -43,7 +43,7 @@ class Executor extends org.apache.mesos.Executor with Logging {
     RemoteActor.classLoader = getClass.getClassLoader
 
     // Initialize Spark environment (using system properties read above)
-    env = SparkEnv.createFromSystemProperties(false, false)
+    env = SparkEnv.createFromSystemProperties(slaveInfo.getHostname(), 0, false, false)
     SparkEnv.set(env)
     // Old stuff that isn't yet using env
     Broadcast.initialize(false)
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index d18ecb921d..0c97cd44a1 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -51,7 +51,7 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg
     logInfo("Registered MapOutputTrackerActor actor")
     actor
   } else {
-    val url = "akka://spark@%s:%s/%s".format(ip, port, actorName)
+    val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName)
     actorSystem.actorFor(url)
   }
 
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 0272040080..fc364b5307 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -60,13 +60,17 @@ class SparkContext(
     System.setProperty("spark.master.host", Utils.localIpAddress)
   }
   if (System.getProperty("spark.master.port") == null) {
-    System.setProperty("spark.master.port", "7077")
+    System.setProperty("spark.master.port", "0")
   }
 
   private val isLocal = master.startsWith("local") // TODO: better check for local
 
   // Create the Spark execution environment (cache, map output tracker, etc)
-  val env = SparkEnv.createFromSystemProperties(true, isLocal) 
+  val env = SparkEnv.createFromSystemProperties(
+    System.getProperty("spark.master.host"),
+    System.getProperty("spark.master.port").toInt,
+    true,
+    isLocal)
   SparkEnv.set(env)
   Broadcast.initialize(true)
 
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 974cb5f401..5dcf25f997 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -1,6 +1,8 @@
 package spark
 
 import akka.actor.ActorSystem
+import akka.actor.ActorSystemImpl
+import akka.remote.RemoteActorRefProvider
 
 import com.typesafe.config.ConfigFactory
 
@@ -36,21 +38,33 @@ object SparkEnv {
     env.get()
   }
 
-  def createFromSystemProperties(isMaster: Boolean, isLocal: Boolean): SparkEnv = {
-    val host = System.getProperty("spark.master.host")
-    val port = System.getProperty("spark.master.port").toInt
-    if (port == 0) {
-      throw new IllegalArgumentException("Setting spark.master.port to 0 is not yet supported")
-    }
+  def createFromSystemProperties(
+      hostname: String,
+      port: Int,
+      isMaster: Boolean,
+      isLocal: Boolean
+    ) : SparkEnv = {
+
     val akkaConf = ConfigFactory.parseString("""
-        akka.daemonic = on
-        akka.actor.provider = "akka.remote.RemoteActorRefProvider"
-        akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
-        akka.remote.netty.hostname = "%s"
-        akka.remote.netty.port = %d
-      """.format(host, port))
+      akka.daemonic = on
+      akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
+      akka.actor.provider = "akka.remote.RemoteActorRefProvider"
+      akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
+      akka.remote.netty.hostname = "%s"
+      akka.remote.netty.port = %d
+      """.format(hostname, port))
+
     val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader)
 
+    // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
+    // figure out which port number Akka actually bound to and set spark.master.port to it.
+    // Unfortunately Akka doesn't yet provide an API for this except if you cast objects as below.
+    if (isMaster && port == 0) {
+      val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider
+      val port = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
+      System.setProperty("spark.master.port", port.toString)
+    }
+
     val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer")
     val serializer = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer]
     
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 5fe0e22dd0..97a5b0cb45 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -348,8 +348,9 @@ object BlockManagerMaster extends Logging {
         Props(new BlockManagerMaster(isLocal)), name = AKKA_ACTOR_NAME)
       logInfo("Registered BlockManagerMaster Actor")
     } else {
-      val url = "akka://spark@%s:%s/%s".format(
+      val url = "akka://spark@%s:%s/user/%s".format(
         DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME)
+      logInfo("Connecting to BlockManagerMaster: " + url)
       masterActor = actorSystem.actorFor(url)
     }
   }
@@ -425,7 +426,7 @@ object BlockManagerMaster extends Logging {
     
     try {
       communicate(msg)
-      logInfo("Heartbeat sent successfully")
+      logDebug("Heartbeat sent successfully")
       logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs))
       return true
     } catch {
-- 
GitLab