Skip to content
Snippets Groups Projects
Commit a86bb459 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Showing shuffle status and purging old stages

parent 3485e733
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
)
val storage = new BlockManagerUI(sc)
val jobs = new JobProgressUI(sc)
val allHandlers = handlers ++ storage.getHandlers ++ jobs.getHandlers
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers
def start() {
/** Start an HTTP server to run the Web interface */
......
......@@ -19,7 +19,8 @@ class IndexPage(parent: JobProgressUI) {
val dateFmt = parent.dateFmt
def render(request: HttpServletRequest): Seq[Node] = {
val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total")
val stageHeaders = Seq("Stage ID", "Origin", "Submitted", "Duration", "Tasks: Complete/Total",
"Shuffle Activity")
val activeStages = listener.activeStages.toSeq
val completedStages = listener.completedStages.toSeq
......@@ -44,6 +45,14 @@ class IndexPage(parent: JobProgressUI) {
case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown"
}
val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id))
val shuffleString = (read, write) match {
case (true, true) => "Read/Write"
case (true, false) => "Read"
case (false, true) => "Write"
case _ => "None"
}
<tr>
<td><a href={"/stages/stage?id=%s".format(s.id)}>{s.id}</a></td>
<td>{s.origin}</td>
......@@ -56,6 +65,7 @@ class IndexPage(parent: JobProgressUI) {
case _ =>
}}
</td>
<td>{shuffleString}</td>
</tr>
}
}
......@@ -33,11 +33,16 @@ private[spark] class JobProgressUI(sc: SparkContext) {
}
private[spark] class JobProgressListener extends SparkListener {
// TODO(pwendell) Currently does not handle entirely failed stages
// How many stages to remember
val RETAINED_STAGES = 1000
val activeStages = HashSet[Stage]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos = HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics)]]()
val completedStages = ListBuffer[Stage]() // Todo (pwendell): Evict these over time
val completedStages = ListBuffer[Stage]()
override def onJobStart(jobStart: SparkListenerJobStart) { }
......@@ -45,6 +50,18 @@ private[spark] class JobProgressListener extends SparkListener {
val stage = stageCompleted.stageInfo.stage
activeStages -= stage
stage +=: completedStages
if (completedStages.size > RETAINED_STAGES) purgeStages()
}
/** Remove and garbage collect old stages */
def purgeStages() {
val toRemove = RETAINED_STAGES / 10
completedStages.takeRight(toRemove).foreach( s => {
stageToTasksComplete.remove(s.id)
stageToTasksFailed.remove(s.id)
stageToTaskInfos.remove(s.id)
})
completedStages.trimEnd(toRemove)
}
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
......@@ -64,4 +81,22 @@ private[spark] class JobProgressListener extends SparkListener {
}
override def onJobEnd(jobEnd: SparkListenerEvents) { }
/** Is this stage's input from a shuffle read. */
def hasShuffleRead(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.shuffleReadMetrics.isDefined
}
return false // No tasks have finished for this stage
}
/** Is this stage's output to a shuffle write. */
def hasShuffleWrite(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.shuffleWriteMetrics.isDefined
}
return false // No tasks have finished for this stage
}
}
\ No newline at end of file
......@@ -4,8 +4,6 @@ import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.ListBuffer
import spark.ui.UIUtils._
import spark.util.Distribution
import spark.scheduler.cluster.TaskInfo
......@@ -22,13 +20,13 @@ class StagePage(parent: JobProgressUI) {
val stageId = request.getParameter("id").toInt
val tasks = listener.stageToTaskInfos(stageId)
val hasShuffleRead = tasks.head._2.shuffleReadMetrics.isDefined
val hasShuffleWrite = tasks.head._2.shuffleWriteMetrics.isDefined
val shuffleRead = listener.hasShuffleRead(stageId)
val shuffleWrite = listener.hasShuffleWrite(stageId)
val taskHeaders =
ListBuffer("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time")
if (hasShuffleRead) { taskHeaders += "Shuffle Read (bytes)" }
if (hasShuffleWrite) { taskHeaders += "Shuffle Write (bytes)" }
val taskHeaders: Seq[String] =
Seq("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read (bytes)") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write (bytes)") else Nil}
val taskTable = listingTable(taskHeaders, taskRow, tasks)
......@@ -46,8 +44,8 @@ class StagePage(parent: JobProgressUI) {
val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
if (hasShuffleRead) shuffleReadQuantiles else Nil,
if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
if (shuffleRead) shuffleReadQuantiles else Nil,
if (shuffleWrite) shuffleWriteQuantiles else Nil)
val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
val quantileTable = listingTable(quantileHeaders, quantileRow, listings)
......@@ -72,6 +70,4 @@ class StagePage(parent: JobProgressUI) {
{metrics.shuffleWriteMetrics.map{m => <td>{m.shuffleBytesWritten}</td>}.getOrElse("") }
</tr>
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment