From 93e8ed85aa1fbcb6428934b30d01f2b4090538b9 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Mon, 24 Jun 2013 13:11:18 -0700
Subject: [PATCH] Work around for initalization issue

---
 core/src/main/scala/spark/SparkContext.scala          |  5 +++--
 core/src/main/scala/spark/ui/SparkUI.scala            | 10 +++++++++-
 core/src/main/scala/spark/ui/jobs/IndexPage.scala     |  2 +-
 core/src/main/scala/spark/ui/jobs/JobProgressUI.scala |  9 +++++++--
 core/src/main/scala/spark/ui/jobs/StagePage.scala     |  2 +-
 5 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 901cda4174..1f420d73f7 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -102,6 +102,9 @@ class SparkContext(
   private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
   private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
 
+  // Initalize the Spark UI
+  private[spark] val ui = new SparkUI(this)
+  ui.bind()
 
   // Add each JAR given through the constructor
   if (jars != null) {
@@ -211,8 +214,6 @@ class SparkContext(
   @volatile private var dagScheduler = new DAGScheduler(taskScheduler)
   dagScheduler.start()
 
-  // Start the Spark UI
-  private[spark] val ui = new SparkUI(this)
   ui.start()
 
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 66dcf07384..2d5a328015 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -26,7 +26,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
   val jobs = new JobProgressUI(sc)
   val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers
 
-  def start() {
+  def bind() {
     /** Start an HTTP server to run the Web interface */
     try {
       val (server, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers)
@@ -38,6 +38,14 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
       System.exit(1)
     }
   }
+  /** Initialize all components of the server */
+  def start() {
+    // NOTE: This is decoupled from bind() because of the following dependency cycle:
+    //  DAGScheduler() requires that the port of this server is known
+    //  This server must register all handlers, including JobProgressUI, before binding
+    //  JobProgressUI registers a listener with SparkContext, which requires sc to initialize
+    jobs.start()
+  }
 
   private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
 }
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 2e2dcdfbc6..134c93091d 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -15,7 +15,7 @@ import spark.storage.StorageLevel
 
 /** Page showing list of all ongoing and recently finished stages */
 class IndexPage(parent: JobProgressUI) {
-  val listener = parent.listener
+  def listener = parent.listener
   val dateFmt = parent.dateFmt
 
   def render(request: HttpServletRequest): Seq[Node] = {
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 70f8e431f2..99f9f2d9f6 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -18,10 +18,15 @@ import spark.Success
 
 /** Web UI showing progress status of all jobs in the given SparkContext. */
 private[spark] class JobProgressUI(val sc: SparkContext) {
-  val listener = new JobProgressListener
+  private var _listener: Option[JobProgressListener] = None
+  def listener = _listener.get
+
   val dateFmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss")
 
-  sc.addSparkListener(listener)
+  def start() {
+    _listener = Some(new JobProgressListener)
+    sc.addSparkListener(listener)
+  }
 
   private val indexPage = new IndexPage(this)
   private val stagePage = new StagePage(this)
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index 42eb1f9eef..8a488498d9 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -13,7 +13,7 @@ import spark.executor.TaskMetrics
 
 /** Page showing statistics and task list for a given stage */
 class StagePage(parent: JobProgressUI) {
-  val listener = parent.listener
+  def listener = parent.listener
   val dateFmt = parent.dateFmt
 
   def render(request: HttpServletRequest): Seq[Node] = {
-- 
GitLab