diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala index 65e38031448d468ffd9be83c0dd23f611bd4f2b9..19870408d37f770c9c2f7b003a1d7dded133dc26 100644 --- a/core/src/main/scala/spark/CacheTracker.scala +++ b/core/src/main/scala/spark/CacheTracker.scala @@ -116,7 +116,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl logInfo("Registered CacheTrackerActor actor") actor } else { - val url = "akka://spark@%s:%s/%s".format(ip, port, actorName) + val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName) actorSystem.actorFor(url) } diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index af9eb9c878ede5fd39441c413bf72c56524b0b5f..3d70cf1737f5e44ca8c74d7bac6f18aa390a195a 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -43,7 +43,7 @@ class Executor extends org.apache.mesos.Executor with Logging { RemoteActor.classLoader = getClass.getClassLoader // Initialize Spark environment (using system properties read above) - env = SparkEnv.createFromSystemProperties(false, false) + env = SparkEnv.createFromSystemProperties(slaveInfo.getHostname(), 0, false, false) SparkEnv.set(env) // Old stuff that isn't yet using env Broadcast.initialize(false) diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala index d18ecb921dccae19ddaf970c86a62494ca1e6a6c..0c97cd44a1b0ff2306eeb02b62d6cb2ed099733f 100644 --- a/core/src/main/scala/spark/MapOutputTracker.scala +++ b/core/src/main/scala/spark/MapOutputTracker.scala @@ -51,7 +51,7 @@ class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolean) extends Logg logInfo("Registered MapOutputTrackerActor actor") actor } else { - val url = "akka://spark@%s:%s/%s".format(ip, port, actorName) + val url = "akka://spark@%s:%s/user/%s".format(ip, port, actorName) actorSystem.actorFor(url) } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 02720400801f17efb97bc1fe62d2777b4051d6b4..fc364b530707601694e712b93aa6bb4b6725e111 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -60,13 +60,17 @@ class SparkContext( System.setProperty("spark.master.host", Utils.localIpAddress) } if (System.getProperty("spark.master.port") == null) { - System.setProperty("spark.master.port", "7077") + System.setProperty("spark.master.port", "0") } private val isLocal = master.startsWith("local") // TODO: better check for local // Create the Spark execution environment (cache, map output tracker, etc) - val env = SparkEnv.createFromSystemProperties(true, isLocal) + val env = SparkEnv.createFromSystemProperties( + System.getProperty("spark.master.host"), + System.getProperty("spark.master.port").toInt, + true, + isLocal) SparkEnv.set(env) Broadcast.initialize(true) diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 974cb5f401f6cae90b016f239b4862bb65a1fbb1..5dcf25f997b0323e42c0360d5c22234c11d41223 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -1,6 +1,8 @@ package spark import akka.actor.ActorSystem +import akka.actor.ActorSystemImpl +import akka.remote.RemoteActorRefProvider import com.typesafe.config.ConfigFactory @@ -36,21 +38,33 @@ object SparkEnv { env.get() } - def createFromSystemProperties(isMaster: Boolean, isLocal: Boolean): SparkEnv = { - val host = System.getProperty("spark.master.host") - val port = System.getProperty("spark.master.port").toInt - if (port == 0) { - throw new IllegalArgumentException("Setting spark.master.port to 0 is not yet supported") - } + def createFromSystemProperties( + hostname: String, + port: Int, + isMaster: Boolean, + isLocal: Boolean + ) : SparkEnv = { + val akkaConf = ConfigFactory.parseString(""" - akka.daemonic = on - akka.actor.provider = "akka.remote.RemoteActorRefProvider" - akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" - akka.remote.netty.hostname = "%s" - akka.remote.netty.port = %d - """.format(host, port)) + akka.daemonic = on + akka.event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] + akka.actor.provider = "akka.remote.RemoteActorRefProvider" + akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" + akka.remote.netty.hostname = "%s" + akka.remote.netty.port = %d + """.format(hostname, port)) + val actorSystem = ActorSystem("spark", akkaConf, getClass.getClassLoader) + // Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port), + // figure out which port number Akka actually bound to and set spark.master.port to it. + // Unfortunately Akka doesn't yet provide an API for this except if you cast objects as below. + if (isMaster && port == 0) { + val provider = actorSystem.asInstanceOf[ActorSystemImpl].provider + val port = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get + System.setProperty("spark.master.port", port.toString) + } + val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer") val serializer = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer] diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 5fe0e22dd0d921e22da2f8c7bfecd1081663fe1f..97a5b0cb45eca0bbfb96cf2310c4e7e5a4f30221 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -348,8 +348,9 @@ object BlockManagerMaster extends Logging { Props(new BlockManagerMaster(isLocal)), name = AKKA_ACTOR_NAME) logInfo("Registered BlockManagerMaster Actor") } else { - val url = "akka://spark@%s:%s/%s".format( + val url = "akka://spark@%s:%s/user/%s".format( DEFAULT_MASTER_IP, DEFAULT_MASTER_PORT, AKKA_ACTOR_NAME) + logInfo("Connecting to BlockManagerMaster: " + url) masterActor = actorSystem.actorFor(url) } } @@ -425,7 +426,7 @@ object BlockManagerMaster extends Logging { try { communicate(msg) - logInfo("Heartbeat sent successfully") + logDebug("Heartbeat sent successfully") logDebug("Got in syncHeartBeat 1 " + tmp + " 1 " + Utils.getUsedTimeMs(startTimeMs)) return true } catch {