diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 2a49dfb4867febf0d0099b0c7040e08a7823f1df..127be9e7d49d7c8ef6a572e028a6c7b69428affe 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -71,7 +71,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } else { addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) context.watch(sender) // This doesn't work with remote actors but helps for testing - sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort) + sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort) schedule() } } diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala index 065b62a4b20dcf91cbff9165e46ceeafe0ad57fc..50e93cd00f932117f0c614df68d1fdb6ea904330 100644 --- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala @@ -15,21 +15,24 @@ import spark.ui.JettyUtils._ * Web UI server for the standalone master. */ private[spark] -class MasterWebUI(val master: ActorRef) extends Logging { +class MasterWebUI(val master: ActorRef, requestedPort: Option[Int]) extends Logging { implicit val timeout = Duration.create( System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") val host = Utils.localHostName() - val port = Option(System.getProperty("master.ui.port")) - .getOrElse(MasterWebUI.DEFAULT_PORT).toInt + val port = requestedPort.getOrElse( + System.getProperty("master.ui.port", MasterWebUI.DEFAULT_PORT).toInt) + var server: Option[Server] = None + var boundPort: Option[Int] = None val applicationPage = new ApplicationPage(this) val indexPage = new IndexPage(this) def start() { try { - val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) + val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) server = Some(srv) + boundPort = Some(bPort) logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 28553b6c02d57049229c298e5eaa58b379ea71e8..b6a26245fc9feb4d7e0f47bb9ce81b596192207c 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -45,7 +45,7 @@ private[spark] class Worker( val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host } - val webUi = new WorkerWebUI(self, workDir) + val webUi = new WorkerWebUI(self, workDir, Some(webUiPort)) var coresUsed = 0 var memoryUsed = 0 @@ -77,14 +77,14 @@ private[spark] class Worker( sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() - connectToMaster() webUi.start() + connectToMaster() } def connectToMaster() { logInfo("Connecting to master " + masterUrl) master = context.actorFor(Master.toAkkaUrl(masterUrl)) - master ! RegisterWorker(workerId, host, port, cores, memory, webUiPort, publicAddress) + master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala index ee3889192d7bec0dfdaf3639dcb7838f9f993241..2615458deeb6384f7e19e349f07fe6c02ac66b93 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -19,13 +19,16 @@ import spark.ui.JettyUtils._ * Web UI server for the standalone worker. */ private[spark] -class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging { +class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None) + extends Logging { implicit val timeout = Timeout( Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) val host = Utils.localHostName() - val port = Option(System.getProperty("wroker.ui.port")) - .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt + val port = requestedPort.getOrElse( + System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) + var server: Option[Server] = None + var boundPort: Option[Int] = None val indexPage = new IndexPage(this) @@ -38,9 +41,10 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging { def start() { try { - val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) + val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) server = Some(srv) - logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) + boundPort = Some(bPort) + logInfo("Started Worker web UI at http://%s:%d".format(host, bPort)) } catch { case e: Exception => logError("Failed to create Worker JettyUtils", e) diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala index 85d6a7e8674a4dd5bf0e85f953c428a699805dd9..1f665cbb42a36c71f9b049dc31ddb8952437437d 100644 --- a/core/src/main/scala/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/spark/ui/JettyUtils.scala @@ -95,7 +95,7 @@ private[spark] object JettyUtils extends Logging { val server = new Server(currentPort) server.setHandler(handlerList) Try { server.start() } match { - case s: Success[_] => (server, currentPort) + case s: Success[_] => (server, server.getConnectors.head.getLocalPort) case f: Failure[_] => server.stop() logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))