Skip to content
Snippets Groups Projects
Commit ebe1efc8 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge remote-tracking branch 'pwendell/ui-updates'

parents fd666512 32b9d21a
No related branches found
No related tags found
No related merge requests found
...@@ -51,7 +51,7 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -51,7 +51,7 @@ private[spark] class JobProgressListener extends SparkListener {
val stageToTasksComplete = HashMap[Int, Int]() val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]() val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos = val stageToTaskInfos =
HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]]() HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {} override def onJobStart(jobStart: SparkListenerJobStart) {}
...@@ -78,17 +78,17 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -78,17 +78,17 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId val sid = taskEnd.task.stageId
val (failureInfo, metrics): (Option[ExceptionFailure], TaskMetrics) = val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match { taskEnd.reason match {
case e: ExceptionFailure => case e: ExceptionFailure =>
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
(Some(e), e.metrics.get) (Some(e), e.metrics)
case _ => case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
(None, taskEnd.taskMetrics) (None, Some(taskEnd.taskMetrics))
} }
val taskList = stageToTaskInfos.getOrElse( val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]()) sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskEnd.taskInfo, metrics, failureInfo)) taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList stageToTaskInfos(sid) = taskList
} }
...@@ -111,7 +111,7 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -111,7 +111,7 @@ private[spark] class JobProgressListener extends SparkListener {
def hasShuffleRead(stageID: Int): Boolean = { def hasShuffleRead(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks // This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.shuffleReadMetrics.isDefined if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
} }
return false // No tasks have finished for this stage return false // No tasks have finished for this stage
} }
...@@ -120,7 +120,7 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -120,7 +120,7 @@ private[spark] class JobProgressListener extends SparkListener {
def hasShuffleWrite(stageID: Int): Boolean = { def hasShuffleWrite(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks // This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.shuffleWriteMetrics.isDefined if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
} }
return false // No tasks have finished for this stage return false // No tasks have finished for this stage
} }
......
...@@ -52,7 +52,7 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -52,7 +52,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
} }
else { else {
val serviceTimes = validTasks.map{case (info, metrics, exception) => val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.executorRunTime.toDouble} metrics.get.executorRunTime.toDouble}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong)) ms => parent.formatDuration(ms.toLong))
...@@ -61,13 +61,13 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -61,13 +61,13 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleReadSizes = validTasks.map { val shuffleReadSizes = validTasks.map {
case(info, metrics, exception) => case(info, metrics, exception) =>
metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
} }
val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
val shuffleWriteSizes = validTasks.map { val shuffleWriteSizes = validTasks.map {
case(info, metrics, exception) => case(info, metrics, exception) =>
metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
} }
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
...@@ -87,21 +87,21 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -87,21 +87,21 @@ private[spark] class StagePage(parent: JobProgressUI) {
} }
def taskRow(taskData: (TaskInfo, TaskMetrics, Option[ExceptionFailure])): Seq[Node] = { def taskRow(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>) trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData val (info, metrics, exception) = taskData
<tr> <tr>
<td>{info.taskId}</td> <td>{info.taskId}</td>
<td sorttable_customkey={Option(metrics).map{m => m.executorRunTime.toString}.getOrElse("1")}> <td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}>
{Option(metrics).map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")} {metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}
</td> </td>
<td>{info.taskLocality}</td> <td>{info.taskLocality}</td>
<td>{info.hostPort}</td> <td>{info.hostPort}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td> <td>{dateFmt.format(new Date(info.launchTime))}</td>
{Option(metrics).flatMap{m => m.shuffleReadMetrics}.map{s => {metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
<td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")} <td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")}
{Option(metrics).flatMap{m => m.shuffleWriteMetrics}.map{s => {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
<td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")} <td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")}
<td>{exception.map(e => <td>{exception.map(e =>
<span> <span>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment