diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c31619db279ddc7c97f3d51c8f7c446b7ed7ba59..1cfff5e565d173c730e65ff301b26b59d7c31085 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -127,4 +127,8 @@ private[deploy] object DeployMessages { case object CheckForWorkerTimeOut + case object RequestWebUIPort + + case class WebUIPortResponse(webUIBoundPort: Int) + } diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 78e3747ad81669a4ff3b6f5080067d7070860f94..10161c82041b91c622f6dc3d5feb4ded72a96352 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -43,7 +43,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ - val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0) + val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0) masterActorSystems += masterSystem val masterUrl = "spark://" + localHostname + ":" + masterPort diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7cf0a7754f78cdf05e5c50252fe5cd98f64322e3..bde59905bc7646ab20170500c331cf77ecdf294a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -17,15 +17,18 @@ package org.apache.spark.deploy.master -import java.text.SimpleDateFormat import java.util.Date +import java.text.SimpleDateFormat import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ import akka.actor.Terminated +import akka.dispatch.Await +import akka.pattern.ask import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} import akka.util.duration._ +import akka.util.Timeout import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} @@ -33,6 +36,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} +import akka.util.{Duration, Timeout} private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { @@ -180,6 +184,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case CheckForWorkerTimeOut => { timeOutDeadWorkers() } + + case RequestWebUIPort => { + sender ! WebUIPortResponse(webUi.boundPort.getOrElse(-1)) + } } /** @@ -364,7 +372,7 @@ private[spark] object Master { def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) - val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort) + val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort) actorSystem.awaitTermination() } @@ -378,9 +386,14 @@ private[spark] object Master { } } - def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = { + def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName) - (actorSystem, boundPort) + val timeoutDuration = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + implicit val timeout = Timeout(timeoutDuration) + val respFuture = actor ? RequestWebUIPort // ask pattern + val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse] + (actorSystem, boundPort, resp.webUIBoundPort) } }