diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 901cda4174f5649f8f24bc223cd678b00b1c5737..1f420d73f7988ebce6b9dd90eeeca671343d2c2c 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -102,6 +102,9 @@ class SparkContext( private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) + // Initalize the Spark UI + private[spark] val ui = new SparkUI(this) + ui.bind() // Add each JAR given through the constructor if (jars != null) { @@ -211,8 +214,6 @@ class SparkContext( @volatile private var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start() - // Start the Spark UI - private[spark] val ui = new SparkUI(this) ui.start() /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala index 66dcf07384339d4ebcda40e2b41beab4f68313a1..2d5a3280159e0b24d90fb7dd14525e627b3ece29 100644 --- a/core/src/main/scala/spark/ui/SparkUI.scala +++ b/core/src/main/scala/spark/ui/SparkUI.scala @@ -26,7 +26,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { val jobs = new JobProgressUI(sc) val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers - def start() { + def bind() { /** Start an HTTP server to run the Web interface */ try { val (server, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers) @@ -38,6 +38,14 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging { System.exit(1) } } + /** Initialize all components of the server */ + def start() { + // NOTE: This is decoupled from bind() because of the following dependency cycle: + // DAGScheduler() requires that the port of this server is known + // This server must register all handlers, including JobProgressUI, before binding + // JobProgressUI registers a listener with SparkContext, which requires sc to initialize + jobs.start() + } private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1") } diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index 2e2dcdfbc6e5b5d0fecc76c258cfdf42bfcc1e67..134c93091d59ed43ee6c255363976c6ea2fb31f6 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -15,7 +15,7 @@ import spark.storage.StorageLevel /** Page showing list of all ongoing and recently finished stages */ class IndexPage(parent: JobProgressUI) { - val listener = parent.listener + def listener = parent.listener val dateFmt = parent.dateFmt def render(request: HttpServletRequest): Seq[Node] = { diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 70f8e431f2b553825b0a21a927b7783807be8585..99f9f2d9f6c625bc20299d56535e67b489980f5e 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -18,10 +18,15 @@ import spark.Success /** Web UI showing progress status of all jobs in the given SparkContext. */ private[spark] class JobProgressUI(val sc: SparkContext) { - val listener = new JobProgressListener + private var _listener: Option[JobProgressListener] = None + def listener = _listener.get + val dateFmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss") - sc.addSparkListener(listener) + def start() { + _listener = Some(new JobProgressListener) + sc.addSparkListener(listener) + } private val indexPage = new IndexPage(this) private val stagePage = new StagePage(this) diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 42eb1f9eef177a4e5cb93d735ab62704d1bf45a3..8a488498d98da24c4a707af51a72a4c0a2519544 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -13,7 +13,7 @@ import spark.executor.TaskMetrics /** Page showing statistics and task list for a given stage */ class StagePage(parent: JobProgressUI) { - val listener = parent.listener + def listener = parent.listener val dateFmt = parent.dateFmt def render(request: HttpServletRequest): Seq[Node] = {