Skip to content
Snippets Groups Projects
Commit fc94576e authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Adding worker version of UI

parent ee73c09a
No related branches found
No related tags found
No related merge requests found
...@@ -20,10 +20,10 @@ import javax.servlet.http.HttpServletRequest ...@@ -20,10 +20,10 @@ import javax.servlet.http.HttpServletRequest
*/ */
private[spark] private[spark]
class MasterWebUI(master: ActorRef) extends Logging { class MasterWebUI(master: ActorRef) extends Logging {
implicit val timeout = Duration.create(
implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
val host = Utils.localHostName() val host = Utils.localHostName()
val port = Option(System.getProperty("spark.ui.port")) val port = Option(System.getProperty("master.ui.port"))
.getOrElse(MasterWebUI.DEFAULT_PORT).toInt .getOrElse(MasterWebUI.DEFAULT_PORT).toInt
def start() { def start() {
...@@ -82,13 +82,13 @@ class MasterWebUI(master: ActorRef) extends Logging { ...@@ -82,13 +82,13 @@ class MasterWebUI(master: ActorRef) extends Logging {
<div class="span12"> <div class="span12">
<h3> Executor Summary </h3> <h3> Executor Summary </h3>
<br/> <br/>
{executorsTable(app.executors.values.toList)} {executorTable(app.executors.values.toList)}
</div> </div>
</div>; </div>;
UtilsWebUI.makePage(content, "Application Info: " + app.desc.name) UtilsWebUI.makePage(content, "Application Info: " + app.desc.name)
} }
def executorsTable(executors: Seq[ExecutorInfo]): Seq[Node] = { def executorTable(executors: Seq[ExecutorInfo]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed"> <table class="table table-bordered table-striped table-condensed">
<thead> <thead>
<tr> <tr>
...@@ -119,7 +119,7 @@ class MasterWebUI(master: ActorRef) extends Logging { ...@@ -119,7 +119,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
<a href={"%s/log?appId=%s&executorId=%s&logType=stdout" <a href={"%s/log?appId=%s&executorId=%s&logType=stdout"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a> .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
<a href={"%s/log?appId=%s&executorId=%s&logType=stderr" <a href={"%s/log?appId=%s&executorId=%s&logType=stderr"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a> .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
</td> </td>
</tr> </tr>
} }
...@@ -135,7 +135,7 @@ class MasterWebUI(master: ActorRef) extends Logging { ...@@ -135,7 +135,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
<ul class="unstyled"> <ul class="unstyled">
<li><strong>URL:</strong>{state.uri}</li> <li><strong>URL:</strong>{state.uri}</li>
<li><strong>Workers:</strong>{state.workers.size}</li> <li><strong>Workers:</strong>{state.workers.size}</li>
<li><strong>Cores:</strong> {state.workers.map(_.cores).sum}Total, <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
{state.workers.map(_.coresUsed).sum} Used</li> {state.workers.map(_.coresUsed).sum} Used</li>
<li><strong>Memory:</strong> <li><strong>Memory:</strong>
{Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total, {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
...@@ -247,5 +247,5 @@ class MasterWebUI(master: ActorRef) extends Logging { ...@@ -247,5 +247,5 @@ class MasterWebUI(master: ActorRef) extends Logging {
object MasterWebUI { object MasterWebUI {
val STATIC_RESOURCE_DIR = "spark/deploy/static" val STATIC_RESOURCE_DIR = "spark/deploy/static"
val DEFAULT_PORT = "34000" val DEFAULT_PORT = "8080"
} }
\ No newline at end of file
...@@ -88,16 +88,8 @@ private[spark] class Worker( ...@@ -88,16 +88,8 @@ private[spark] class Worker(
} }
def startWebUi() { def startWebUi() {
val webUi = new WorkerWebUI(context.system, self, workDir) val webUi = new WorkerWebUI(self, workDir)
/* webUi.start()
try {
AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler)
} catch {
case e: Exception =>
logError("Failed to create web UI", e)
System.exit(1)
}
*/
} }
override def receive = { override def receive = {
......
...@@ -8,22 +8,137 @@ import akka.util.duration._ ...@@ -8,22 +8,137 @@ import akka.util.duration._
import cc.spray.Directives import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._ import cc.spray.typeconversion.TwirlSupport._
import cc.spray.http.MediaTypes import cc.spray.http.MediaTypes
import cc.spray.typeconversion.SprayJsonSupport._
import spark.deploy.{WorkerState, RequestWorkerState} import spark.deploy.{WorkerState, RequestWorkerState}
import spark.deploy.JsonProtocol._ import spark.deploy.JsonProtocol._
import java.io.File import java.io.File
import spark.util.{WebUI => UtilsWebUI}
import spark.{Utils, Logging}
import org.eclipse.jetty.server.Handler
import spark.util.WebUI._
import spark.deploy.WorkerState
import scala.io.Source
import javax.servlet.http.HttpServletRequest
import xml.Node
/** /**
* Web UI server for the standalone worker. * Web UI server for the standalone worker.
*/ */
private[spark] private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) extends Directives { class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging {
val RESOURCE_DIR = "spark/deploy/worker/webui" implicit val timeout = Timeout(
val STATIC_RESOURCE_DIR = "spark/deploy/static" Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
val host = Utils.localHostName()
implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) val port = Option(System.getProperty("wroker.ui.port"))
.getOrElse(WorkerWebUI.DEFAULT_PORT).toInt
val handlers = Array[(String, Handler)](
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
("/log", (request: HttpServletRequest) => log(request)),
("*", (request: HttpServletRequest) => index)
)
def start() {
try {
val (server, boundPort) = UtilsWebUI.startJettyServer("0.0.0.0", port, handlers)
logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Worker WebUI", e)
System.exit(1)
}
}
def index(): Seq[Node] = {
val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
val workerState = Await.result(stateFuture, 3 seconds)
val content =
<div class="row"> <!-- Worker Details -->
<div class="span12">
<ul class="unstyled">
<li><strong>ID:</strong> {workerState.workerId}</li>
<li><strong>
Master URL:</strong> {workerState.masterUrl}
</li>
<li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
<li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)}
({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li>
</ul>
<p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
</div>
</div>
<hr/>
<div class="row"> <!-- Running Executors -->
<div class="span12">
<h3> Running Executors {workerState.executors.size} </h3>
<br/>
{executorTable(workerState.executors)}
</div>
</div>
<hr/>
<div class="row"> <!-- Finished Executors -->
<div class="span12">
<h3> Finished Executors </h3>
<br/>
{executorTable(workerState.finishedExecutors)}
</div>
</div>;
UtilsWebUI.makePage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
}
def executorTable(executors: Seq[ExecutorRunner]): Seq[Node] = {
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<tr>
<th>ExecutorID</th>
<th>Cores</th>
<th>Memory</th>
<th>Job Details</th>
<th>Logs</th>
</tr>
</thead>
<tbody>
{executors.map(executorRow)}
</tbody>
</table>
}
def executorRow(executor: ExecutorRunner): Seq[Node] = {
<tr>
<td>{executor.execId}</td>
<td>{executor.cores}</td>
<td>{Utils.memoryMegabytesToString(executor.memory)}</td>
<td>
<ul class="unstyled">
<li><strong>ID:</strong> {executor.appId}</li>
<li><strong>Name:</strong> {executor.appDesc.name}</li>
<li><strong>User:</strong> {executor.appDesc.user}</li>
</ul>
</td>
<td>
<a href={"log?appId=%s&executorId=%s&logType=stdout"
.format(executor.appId, executor.execId)}>stdout</a>
<a href={"log?appId=%s&executorId=%s&logType=stderr"
.format(executor.appId, executor.execId)}>stderr</a>
</td>
</tr>
}
def log(request: HttpServletRequest): String = {
val appId = request.getParameter("appId")
val executorId = request.getParameter("executorId")
val logType = request.getParameter("logType")
val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
val source = Source.fromFile(path)
val lines = source.mkString
source.close()
lines
}
/*
val handler = { val handler = {
get { get {
(path("") & parameters('format ?)) { (path("") & parameters('format ?)) {
...@@ -54,4 +169,10 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File) ...@@ -54,4 +169,10 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef, workDir: File)
getFromResourceDirectory(RESOURCE_DIR) getFromResourceDirectory(RESOURCE_DIR)
} }
} }
*/
}
object WorkerWebUI {
val STATIC_RESOURCE_DIR = "spark/deploy/static"
val DEFAULT_PORT="8081"
} }
...@@ -16,13 +16,14 @@ import annotation.tailrec ...@@ -16,13 +16,14 @@ import annotation.tailrec
object WebUI extends Logging { object WebUI extends Logging {
type Responder[T] = HttpServletRequest => T type Responder[T] = HttpServletRequest => T
implicit def jsonResponderToHandler(responder: Responder[JSONObject]): Handler = { implicit def jsonResponderToHandler(responder: Responder[JSONObject]): Handler =
createHandler(responder, "text/json") createHandler(responder, "text/json")
}
implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler = { implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
createHandler(responder, "text/html") createHandler(responder, "text/html")
}
implicit def textResponderToHandler(responder: Responder[String]): Handler =
createHandler(responder, "text/plain")
def createHandler[T <% AnyRef](responder: Responder[T], contentType: String): Handler = { def createHandler[T <% AnyRef](responder: Responder[T], contentType: String): Handler = {
new AbstractHandler { new AbstractHandler {
...@@ -40,12 +41,28 @@ object WebUI extends Logging { ...@@ -40,12 +41,28 @@ object WebUI extends Logging {
} }
} }
/** Create and return a staticHandler if resourceBase can be located */
def createStaticHandler(resourceBase: String): ResourceHandler = { def createStaticHandler(resourceBase: String): ResourceHandler = {
val staticHandler = new ResourceHandler val staticHandler = new ResourceHandler
val resource = getClass.getClassLoader.getResource(resourceBase) Option(getClass.getClassLoader.getResource(resourceBase)) match {
staticHandler.setResourceBase(resource.toString) case Some(res) =>
staticHandler staticHandler.setResourceBase (res.toString)
staticHandler
}
}
/*
/** Create and return a staticHandler if resourceBase can be located */
def createStaticHandler(resourceBase: String): Option[ResourceHandler] = {
val staticHandler = new ResourceHandler
Option(getClass.getClassLoader.getResource(resourceBase)) match {
case Some(res) =>
staticHandler.setResourceBase (res.toString)
Some(staticHandler)
case None => None
}
} }
*/
def startJettyServer(ip: String, port: Int, handlers: Array[(String, Handler)]): (Server, Int) = { def startJettyServer(ip: String, port: Int, handlers: Array[(String, Handler)]): (Server, Int) = {
val handlersToRegister = handlers.map { case(path, handler) => val handlersToRegister = handlers.map { case(path, handler) =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment