Skip to content
Snippets Groups Projects
Commit e66f5701 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Completely hacked version of block manager UI in jetty

parent 60fbf7e4
No related branches found
No related tags found
No related merge requests found
......@@ -11,9 +11,11 @@ import spark.Utils
import spark.util.WebUI
import org.eclipse.jetty.server.handler.{HandlerList, ContextHandler, ResourceHandler}
import org.eclipse.jetty.server.Handler
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import xml.Elem
import xml.Node
import java.net.URLClassLoader
import spark.util.WebUI._
/**
......@@ -38,7 +40,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
/** Start a HTTP server to run the Web interface */
def start() {
try {
AkkaUtils.startJettyServer("0.0.0.0", port, handlers)
WebUI.startJettyServer("0.0.0.0", port, handlers)
logInfo("Started BlockManager web UI at http://%s:%d".format(host, port))
} catch {
case e: Exception =>
......@@ -55,10 +57,109 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
val handlers = Array[(String, Handler)](
("/static", staticHandler),
("*", WebUI.makeHandler(WebUI.makePage(indexContent, "Spark Storage")))
("/rdd", (request: HttpServletRequest) => rddPage(request)),
("*", (request: HttpServletRequest) => WebUI.makePage(indexPage, "Spark Storage"))
)
def indexContent: Seq[Node] = {
def rddPage(request: HttpServletRequest): Seq[Node] = {
val id = request.getParameter("id")
val prefix = "rdd_" + id.toString
val storageStatusList = sc.getExecutorStorageStatus
val filteredStorageStatusList = StorageUtils.
filterStorageStatusByPrefix(storageStatusList, prefix)
val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
val content =
<div class="row">
<div class="span12">
<ul class="unstyled">
<li>
<strong>Storage Level:</strong>
{rddInfo.storageLevel.description}
</li>
<li>
<strong>Cached Partitions:</strong>
{rddInfo.numCachedPartitions}
</li>
<li>
<strong>Total Partitions:</strong>
{rddInfo.numPartitions}
</li>
<li>
<strong>Memory Size:</strong>
{Utils.memoryBytesToString(rddInfo.memSize)}
</li>
<li>
<strong>Disk Size:</strong>
{Utils.memoryBytesToString(rddInfo.diskSize)}
</li>
</ul>
</div>
</div>
<hr/>
<div class="row">
<div class="span12">
<h3> RDD Summary </h3>
<br/>
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<tr>
<th>Block Name</th>
<th>Storage Level</th>
<th>Size in Memory</th>
<th>Size on Disk</th>
</tr>
</thead>
<tbody>
{storageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1).map { case (k,v) =>
<tr>
<td>{k}</td>
<td>
{v.storageLevel.description}
</td>
<td>{Utils.memoryBytesToString(v.memSize)}</td>
<td>{Utils.memoryBytesToString(v.diskSize)}</td>
</tr>
}
}
</tbody>
</table>
</div>
</div>
<hr/>
<div class="row">
<div class="span12">
<h3> Worker Summary </h3>
<br/>
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<tr>
<th>Host</th>
<th>Memory Usage</th>
<th>Disk Usage</th>
</tr>
</thead>
<tbody>
{for(status <- storageStatusList) {
<tr>
<td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
<td>
{Utils.memoryBytesToString(status.memUsed(prefix))}
({Utils.memoryBytesToString(status.memRemaining)} Total Available)
</td>
<td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td>
</tr>
}}
</tbody>
</table>
</div>
</div>;
WebUI.makePage(content, "RDD Info: " + id)
}
def indexPage: Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
// Calculate macro-level statistics
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
......@@ -77,42 +178,36 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
</ul>
</div>
</div>
<hr/>
}
/*
val handler = {
get {
path("") {
completeWith {
// Request the current storage status from the Master
val storageStatusList = sc.getExecutorStorageStatus
// Calculate macro-level statistics
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L)
val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
spark.storage.html.index.
render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList)
}
} ~
path("rdd") {
parameter("id") { id =>
completeWith {
val prefix = "rdd_" + id.toString
val storageStatusList = sc.getExecutorStorageStatus
val filteredStorageStatusList = StorageUtils.
filterStorageStatusByPrefix(storageStatusList, prefix)
val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head
spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList)
<hr/>
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<tr>
<th>RDD Name</th>
<th>Storage Level</th>
<th>Cached Partitions</th>
<th>Fraction Partitions Cached</th>
<th>Size in Memory</th>
<th>Size on Disk</th>
</tr>
</thead>
<tbody>
{for (rdd <- rdds) yield
<tr>
<td>
<a href={"/rdd?id=%s".format(rdd.id)}>
{rdd.name}
</a>
</td>
<td>{rdd.storageLevel.description}
</td>
<td>{rdd.numCachedPartitions}</td>
<td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td>
<td>{Utils.memoryBytesToString(rdd.memSize)}</td>
<td>{Utils.memoryBytesToString(rdd.diskSize)}</td>
</tr>
}
}
} ~
pathPrefix("static") {
getFromResourceDirectory(STATIC_RESOURCE_DIR)
}
}
*/
</tbody>
</table>
}
private[spark] def appUIAddress = "http://" + host + ":" + port
}
......@@ -63,27 +63,4 @@ private[spark] object AkkaUtils {
val boundPort = provider.asInstanceOf[RemoteActorRefProvider].transport.address.port.get
return (actorSystem, boundPort)
}
/**
* Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to
* handle requests. Returns the bound port or throws a SparkException on failure.
* TODO: Not changing ip to host here - is it required ?
*/
def startJettyServer(ip: String, port: Int, handlers: Array[(String, Handler)]) = {
val handlersToRegister = handlers.map { case(path, handler) =>
if (path == "*") {
handler
} else {
val contextHandler = new ContextHandler(path)
contextHandler.setHandler(handler)
contextHandler.asInstanceOf[org.eclipse.jetty.server.Handler]
}
}
val handlerList = new HandlerList
handlerList.setHandlers(handlersToRegister)
val server = new Server(port)
server.setHandler(handlerList)
server.start()
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment