Skip to content
Snippets Groups Projects
Commit 362d996c authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Handful of changes based on matei's review

- Avoid exception when no tasks have finished for a stage
- Adding DOCTYPE so css renders properly
- Adding progress slider
parent 92a4c2a5
No related branches found
No related tags found
No related merge requests found
......@@ -485,6 +485,26 @@ private object Utils extends Logging {
"%.1f %s".formatLocal(Locale.US, value, unit)
}
/**
* Returns a human-readable string representing a duration such as "35ms"
*/
def msDurationToString(ms: Long): String = {
val second = 1000
val minute = 60 * second
val hour = 60 * minute
ms match {
case t if t < second =>
"%dms".format(t)
case t if t < minute =>
"%d.%03ds".format(t / second, t % second)
case t if t < hour =>
"%d:%02d".format(t / minute, (t % minute) / second)
case t =>
"%d:%02d:%02d".format(t / hour, t % hour / minute, (t % hour) % minute / second)
}
}
/**
* Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB".
*/
......
......@@ -26,7 +26,7 @@ private[spark] object JettyUtils extends Logging {
createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
createHandler(responder, "text/html")
createHandler(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString)
implicit def textResponderToHandler(responder: Responder[String]): Handler =
createHandler(responder, "text/plain")
......
......@@ -19,8 +19,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val dateFmt = parent.dateFmt
def render(request: HttpServletRequest): Seq[Node] = {
val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total",
"Shuffle Activity", "Stored RDD")
val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Progress",
"Tasks: Complete/Total", "Shuffle Activity", "Stored RDD")
val activeStages = listener.activeStages.toSeq
val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq
......@@ -43,6 +43,20 @@ private[spark] class IndexPage(parent: JobProgressUI) {
}
}
def makeSlider(completed: Int, total: Int): Seq[Node] = {
val width=130
val height=15
val completeWidth = (completed.toDouble / total) * width
<svg width={width.toString} height={height.toString}>
<rect width={width.toString} height={height.toString}
fill="white" stroke="black" stroke-width="1" />
<rect width={completeWidth.toString} height={height.toString}
fill="rgb(206,206,247)" stroke="black" stroke-width="1" />
</svg>
}
def stageRow(showLink: Boolean = true)(s: Stage): Seq[Node] = {
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
......@@ -55,6 +69,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
case (false, true) => "Write"
case _ => ""
}
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions
<tr>
{if (showLink) {<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td>}
......@@ -63,7 +79,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<td>{submissionTime}</td>
<td>{getElapsedTime(s.submissionTime,
s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
<td>{listener.stageToTasksComplete.getOrElse(s.id, 0)} / {s.numPartitions}
<td>{makeSlider(completedTasks, totalTasks)}</td>
<td>{completedTasks} / {totalTasks}
{listener.stageToTasksFailed.getOrElse(s.id, 0) match {
case f if f > 0 => "(%s failed)".format(f)
case _ =>
......
......@@ -17,6 +17,7 @@ import spark.scheduler._
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
import spark.Success
import spark.Utils
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[spark] class JobProgressUI(val sc: SparkContext) {
......@@ -32,7 +33,7 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
sc.addSparkListener(listener)
}
def formatDuration(ms: Long) = Duration(ms, "milliseconds").printHMS
def formatDuration(ms: Long) = Utils.msDurationToString(ms)
def getHandlers = Seq[(String, Handler)](
("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
......
......@@ -19,20 +19,30 @@ private[spark] class StagePage(parent: JobProgressUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val stageId = request.getParameter("id").toInt
if (!listener.stageToTaskInfos.contains(stageId)) {
val content =
<div>
<h2>Summary Metrics</h2> No tasks have finished yet
<h2>Tasks</h2> No tasks have finished yet
</div>
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId))
}
val tasks = listener.stageToTaskInfos(stageId)
val shuffleRead = listener.hasShuffleRead(stageId)
val shuffleWrite = listener.hasShuffleWrite(stageId)
val taskHeaders: Seq[String] =
Seq("Task ID", "Service Time", "Locality Level", "Worker", "Launch Time") ++
Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write") else Nil}
val taskTable = listingTable(taskHeaders, taskRow, tasks)
val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble}
val serviceQuantiles = "Service Time" +: Distribution(serviceTimes).get.getQuantiles().map(
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
def getQuantileCols(data: Seq[Double]) =
......
......@@ -2,10 +2,11 @@ package spark.ui
import org.scalatest.FunSuite
import org.eclipse.jetty.server.Server
import util.{Try, Success, Failure}
import java.net.ServerSocket
import scala.util.{Failure, Success, Try}
import spark.Utils
private[spark] class UISuite extends FunSuite {
class UISuite extends FunSuite {
test("jetty port increases under contention") {
val startPort = 33333
val server = new Server(startPort)
......@@ -23,7 +24,23 @@ private[spark] class UISuite extends FunSuite {
assert(boundPort != 0)
Try {new ServerSocket(boundPort)} match {
case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
case Failure(e) =>
case Failure (e) =>
}
}
test("string formatting of time durations") {
val second = 1000
val minute = second * 60
val hour = minute * 60
def str = Utils.msDurationToString(_)
assert(str(123) === "123ms")
assert(str(second) === "1.000s")
assert(str(second + 452) === "1.452s")
assert(str(hour) === "1:00:00")
assert(str(minute) === "1:00")
assert(str(minute + 4 * second + 34) === "1:04")
assert(str(10 * hour + minute + 4 * second) === "10:01:04")
assert(str(10 * hour + 59 * minute + 59 * second + 999) === "10:59:59")
}
}
package spark.streaming
import spark.Utils
case class Duration (private val millis: Long) {
def < (that: Duration): Boolean = (this.millis < that.millis)
......@@ -32,8 +34,10 @@ case class Duration (private val millis: Long) {
def toFormattedString: String = millis.toString
def milliseconds: Long = millis
}
def prettyPrint = Utils.msDurationToString(millis)
}
/**
* Helper object that creates instance of [[spark.streaming.Duration]] representing
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment