From f7389330c35a6372c4c7b455b96b8498b9a9da27 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Mon, 24 Jun 2013 16:51:52 -0700
Subject: [PATCH] Allowing for requested port on construction

---
 .../main/scala/spark/deploy/master/Master.scala    |  2 +-
 .../scala/spark/deploy/master/ui/MasterWebUI.scala | 11 +++++++----
 .../main/scala/spark/deploy/worker/Worker.scala    |  6 +++---
 .../scala/spark/deploy/worker/ui/WorkerWebUI.scala | 14 +++++++++-----
 core/src/main/scala/spark/ui/JettyUtils.scala      |  2 +-
 5 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 2a49dfb486..127be9e7d4 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -71,7 +71,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       } else {
         addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
         context.watch(sender)  // This doesn't work with remote actors but helps for testing
-        sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort)
+        sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort)
         schedule()
       }
     }
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
index 065b62a4b2..50e93cd00f 100644
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -15,21 +15,24 @@ import spark.ui.JettyUtils._
  * Web UI server for the standalone master.
  */
 private[spark]
-class MasterWebUI(val master: ActorRef) extends Logging {
+class MasterWebUI(val master: ActorRef, requestedPort: Option[Int]) extends Logging {
   implicit val timeout = Duration.create(
     System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
   val host = Utils.localHostName()
-  val port = Option(System.getProperty("master.ui.port"))
-    .getOrElse(MasterWebUI.DEFAULT_PORT).toInt
+  val port = requestedPort.getOrElse(
+    System.getProperty("master.ui.port", MasterWebUI.DEFAULT_PORT).toInt)
+
   var server: Option[Server] = None
+  var boundPort: Option[Int] = None
 
   val applicationPage = new ApplicationPage(this)
   val indexPage = new IndexPage(this)
 
   def start() {
     try {
-      val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+      val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
       server = Some(srv)
+      boundPort = Some(bPort)
       logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
     } catch {
       case e: Exception =>
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 28553b6c02..b6a26245fc 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -45,7 +45,7 @@ private[spark] class Worker(
     val envVar = System.getenv("SPARK_PUBLIC_DNS")
     if (envVar != null) envVar else host
   }
-  val webUi = new WorkerWebUI(self, workDir)
+  val webUi = new WorkerWebUI(self, workDir, Some(webUiPort))
 
   var coresUsed = 0
   var memoryUsed = 0
@@ -77,14 +77,14 @@ private[spark] class Worker(
     sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
-    connectToMaster()
     webUi.start()
+    connectToMaster()
   }
 
   def connectToMaster() {
     logInfo("Connecting to master " + masterUrl)
     master = context.actorFor(Master.toAkkaUrl(masterUrl))
-    master ! RegisterWorker(workerId, host, port, cores, memory, webUiPort, publicAddress)
+    master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
     context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
     context.watch(master) // Doesn't work with remote actors, but useful for testing
   }
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index ee3889192d..2615458dee 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -19,13 +19,16 @@ import spark.ui.JettyUtils._
  * Web UI server for the standalone worker.
  */
 private[spark]
-class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
+class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None)
+    extends Logging {
   implicit val timeout = Timeout(
     Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
   val host = Utils.localHostName()
-  val port = Option(System.getProperty("wroker.ui.port"))
-    .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt
+  val port = requestedPort.getOrElse(
+    System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+
   var server: Option[Server] = None
+  var boundPort: Option[Int] = None
 
   val indexPage = new IndexPage(this)
 
@@ -38,9 +41,10 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
 
   def start() {
     try {
-      val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+      val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
       server = Some(srv)
-      logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
+      boundPort = Some(bPort)
+      logInfo("Started Worker web UI at http://%s:%d".format(host, bPort))
     } catch {
       case e: Exception =>
         logError("Failed to create Worker JettyUtils", e)
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
index 85d6a7e867..1f665cbb42 100644
--- a/core/src/main/scala/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -95,7 +95,7 @@ private[spark] object JettyUtils extends Logging {
       val server = new Server(currentPort)
       server.setHandler(handlerList)
       Try { server.start() } match {
-        case s: Success[_] => (server, currentPort)
+        case s: Success[_] => (server, server.getConnectors.head.getLocalPort)
         case f: Failure[_] =>
           server.stop()
           logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
-- 
GitLab