From fc94576ece99f2b224a951b32f0f6360701b7cd3 Mon Sep 17 00:00:00 2001
From: Patrick Wendell <pwendell@gmail.com>
Date: Tue, 18 Jun 2013 13:11:57 -0700
Subject: [PATCH] Adding worker version of UI

---
 .../spark/deploy/master/MasterWebUI.scala     |  16 +--
 .../scala/spark/deploy/worker/Worker.scala    |  12 +-
 .../spark/deploy/worker/WorkerWebUI.scala     | 135 +++++++++++++++++-
 core/src/main/scala/spark/util/WebUI.scala    |  31 +++-
 4 files changed, 162 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index 34f50fd5e4..a2e9dfd762 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -20,10 +20,10 @@ import javax.servlet.http.HttpServletRequest
  */
 private[spark]
 class MasterWebUI(master: ActorRef) extends Logging {
-
-  implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+  implicit val timeout = Duration.create(
+    System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
   val host = Utils.localHostName()
-  val port = Option(System.getProperty("spark.ui.port"))
+  val port = Option(System.getProperty("master.ui.port"))
     .getOrElse(MasterWebUI.DEFAULT_PORT).toInt
 
   def start() {
@@ -82,13 +82,13 @@ class MasterWebUI(master: ActorRef) extends Logging {
         <div class="span12">
           <h3> Executor Summary </h3>
           <br/>
-          {executorsTable(app.executors.values.toList)}
+          {executorTable(app.executors.values.toList)}
         </div>
       </div>;
       UtilsWebUI.makePage(content, "Application Info: " + app.desc.name)
   }
 
-  def executorsTable(executors: Seq[ExecutorInfo]): Seq[Node] = {
+  def executorTable(executors: Seq[ExecutorInfo]): Seq[Node] = {
     <table class="table table-bordered table-striped table-condensed">
       <thead>
         <tr>
@@ -119,7 +119,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
         <a href={"%s/log?appId=%s&executorId=%s&logType=stdout"
             .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
         <a href={"%s/log?appId=%s&executorId=%s&logType=stderr"
-          .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
+          .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
       </td>
     </tr>
   }
@@ -135,7 +135,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
           <ul class="unstyled">
             <li><strong>URL:</strong>{state.uri}</li>
             <li><strong>Workers:</strong>{state.workers.size}</li>
-            <li><strong>Cores:</strong> {state.workers.map(_.cores).sum}Total,
+            <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
               {state.workers.map(_.coresUsed).sum} Used</li>
             <li><strong>Memory:</strong>
               {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
@@ -247,5 +247,5 @@ class MasterWebUI(master: ActorRef) extends Logging {
 
 object MasterWebUI {
   val STATIC_RESOURCE_DIR = "spark/deploy/static"
-  val DEFAULT_PORT = "34000"
+  val DEFAULT_PORT = "8080"
 }
\ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index f8fdab927e..3878fe3f7b 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -88,16 +88,8 @@ private[spark] class Worker(
   }
 
   def startWebUi() {
-    val webUi = new WorkerWebUI(context.system, self, workDir)
-    /*
-    try {
-      AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
-    } catch {
-      case e: Exception =>
-        logError("Failed to create web UI", e)
-        System.exit(1)
-    }
-    */
+    val webUi = new WorkerWebUI(self, workDir)
+    webUi.start()
   }
 
   override def receive = {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index 3235c50d1b..b8b4b89738 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -8,22 +8,137 @@ import akka.util.duration._
 import cc.spray.Directives
 import cc.spray.typeconversion.TwirlSupport._
 import cc.spray.http.MediaTypes
-import cc.spray.typeconversion.SprayJsonSupport._
 
 import spark.deploy.{WorkerState, RequestWorkerState}
 import spark.deploy.JsonProtocol._
 import java.io.File
+import spark.util.{WebUI => UtilsWebUI}
+import spark.{Utils, Logging}
+import org.eclipse.jetty.server.Handler
+import spark.util.WebUI._
+import spark.deploy.WorkerState
+import scala.io.Source
+import javax.servlet.http.HttpServletRequest
+import xml.Node
 
 /**
  * Web UI server for the standalone worker.
  */
 private[spark]
-class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) extends Directives {
-  val RESOURCE_DIR = "spark/deploy/worker/webui"
-  val STATIC_RESOURCE_DIR = "spark/deploy/static"
-  
-  implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
-  
+class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging {
+  implicit val timeout = Timeout(
+    Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
+  val host = Utils.localHostName()
+  val port = Option(System.getProperty("wroker.ui.port"))
+    .getOrElse(WorkerWebUI.DEFAULT_PORT).toInt
+
+  val handlers = Array[(String, Handler)](
+    ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
+    ("/log", (request: HttpServletRequest) => log(request)),
+    ("*", (request: HttpServletRequest) => index)
+  )
+
+  def start() {
+    try {
+      val (server, boundPort) = UtilsWebUI.startJettyServer("0.0.0.0", port, handlers)
+      logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
+    } catch {
+      case e: Exception =>
+        logError("Failed to create Worker WebUI", e)
+        System.exit(1)
+    }
+  }
+
+  def index(): Seq[Node] = {
+    val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+    val workerState = Await.result(stateFuture, 3 seconds)
+    val content =
+      <div class="row"> <!-- Worker Details -->
+        <div class="span12">
+          <ul class="unstyled">
+            <li><strong>ID:</strong> {workerState.workerId}</li>
+            <li><strong>
+              Master URL:</strong> {workerState.masterUrl}
+            </li>
+            <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
+            <li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)}
+              ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li>
+          </ul>
+          <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
+        </div>
+      </div>
+      <hr/>
+
+      <div class="row"> <!-- Running Executors -->
+        <div class="span12">
+          <h3> Running Executors {workerState.executors.size} </h3>
+          <br/>
+          {executorTable(workerState.executors)}
+        </div>
+      </div>
+      <hr/>
+
+      <div class="row"> <!-- Finished Executors  -->
+        <div class="span12">
+          <h3> Finished Executors </h3>
+          <br/>
+          {executorTable(workerState.finishedExecutors)}
+        </div>
+      </div>;
+
+    UtilsWebUI.makePage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
+  }
+
+  def executorTable(executors: Seq[ExecutorRunner]): Seq[Node] = {
+    <table class="table table-bordered table-striped table-condensed sortable">
+      <thead>
+        <tr>
+          <th>ExecutorID</th>
+          <th>Cores</th>
+          <th>Memory</th>
+          <th>Job Details</th>
+          <th>Logs</th>
+        </tr>
+      </thead>
+      <tbody>
+        {executors.map(executorRow)}
+    </tbody>
+    </table>
+  }
+
+  def executorRow(executor: ExecutorRunner): Seq[Node] = {
+    <tr>
+      <td>{executor.execId}</td>
+      <td>{executor.cores}</td>
+      <td>{Utils.memoryMegabytesToString(executor.memory)}</td>
+      <td>
+        <ul class="unstyled">
+          <li><strong>ID:</strong> {executor.appId}</li>
+          <li><strong>Name:</strong> {executor.appDesc.name}</li>
+          <li><strong>User:</strong> {executor.appDesc.user}</li>
+        </ul>
+      </td>
+      <td>
+        <a href={"log?appId=%s&executorId=%s&logType=stdout"
+          .format(executor.appId, executor.execId)}>stdout</a>
+        <a href={"log?appId=%s&executorId=%s&logType=stderr"
+          .format(executor.appId, executor.execId)}>stderr</a>
+      </td>
+    </tr>
+  }
+
+  def log(request: HttpServletRequest): String = {
+    val appId = request.getParameter("appId")
+    val executorId = request.getParameter("executorId")
+    val logType = request.getParameter("logType")
+    val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
+    val source = Source.fromFile(path)
+    val lines = source.mkString
+    source.close()
+    lines
+  }
+
+  /*
   val handler = {
     get {
       (path("") & parameters('format ?)) {
@@ -54,4 +169,10 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File)
       getFromResourceDirectory(RESOURCE_DIR)
     }
   }
+  */
+}
+
+object WorkerWebUI {
+  val STATIC_RESOURCE_DIR = "spark/deploy/static"
+  val DEFAULT_PORT="8081"
 }
diff --git a/core/src/main/scala/spark/util/WebUI.scala b/core/src/main/scala/spark/util/WebUI.scala
index eaacd95691..34b776f1d8 100644
--- a/core/src/main/scala/spark/util/WebUI.scala
+++ b/core/src/main/scala/spark/util/WebUI.scala
@@ -16,13 +16,14 @@ import annotation.tailrec
 object WebUI extends Logging {
   type Responder[T] = HttpServletRequest => T
 
-  implicit def jsonResponderToHandler(responder: Responder[JSONObject]): Handler = {
+  implicit def jsonResponderToHandler(responder: Responder[JSONObject]): Handler =
     createHandler(responder, "text/json")
-  }
 
-  implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler = {
+  implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
     createHandler(responder, "text/html")
-  }
+
+  implicit def textResponderToHandler(responder: Responder[String]): Handler =
+    createHandler(responder, "text/plain")
 
   def createHandler[T <% AnyRef](responder: Responder[T], contentType: String): Handler = {
     new AbstractHandler {
@@ -40,12 +41,28 @@ object WebUI extends Logging {
     }
   }
 
+  /** Create and return a staticHandler if resourceBase can be located */
   def createStaticHandler(resourceBase: String): ResourceHandler = {
     val staticHandler = new ResourceHandler
-    val resource = getClass.getClassLoader.getResource(resourceBase)
-    staticHandler.setResourceBase(resource.toString)
-    staticHandler
+    Option(getClass.getClassLoader.getResource(resourceBase)) match {
+      case Some(res) =>
+        staticHandler.setResourceBase (res.toString)
+        staticHandler
+    }
+  }
+
+  /*
+  /** Create and return a staticHandler if resourceBase can be located */
+  def createStaticHandler(resourceBase: String): Option[ResourceHandler] = {
+    val staticHandler = new ResourceHandler
+    Option(getClass.getClassLoader.getResource(resourceBase)) match {
+      case Some(res) =>
+        staticHandler.setResourceBase (res.toString)
+        Some(staticHandler)
+      case None => None
+    }
   }
+  */
 
   def startJettyServer(ip: String, port: Int, handlers: Array[(String, Handler)]): (Server, Int) = {
     val handlersToRegister = handlers.map { case(path, handler) =>
-- 
GitLab