Skip to content
Snippets Groups Projects
Commit c7b27889 authored by Karen Feng's avatar Karen Feng
Browse files

Merge branch 'master' of https://github.com/mesos/spark into bootstrap-update

Conflicts:
	core/src/main/scala/spark/ui/jobs/IndexPage.scala
parents 478a2886 c99b6744
No related branches found
No related tags found
No related merge requests found
...@@ -113,7 +113,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -113,7 +113,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
} }
private[spark] class ExecutorsListener extends SparkListener with Logging { private[spark] class ExecutorsListener extends SparkListener with Logging {
val executorToTasksActive = HashMap[String, HashSet[Long]]() val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos = val executorToTaskInfos =
...@@ -121,9 +121,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -121,9 +121,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskStart(taskStart: SparkListenerTaskStart) { override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId val eid = taskStart.taskInfo.executorId
if (!executorToTasksActive.contains(eid)) val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
executorToTasksActive(eid) = HashSet[Long]() activeTasks += taskStart.taskInfo
executorToTasksActive(eid) += taskStart.taskInfo.taskId
val taskList = executorToTaskInfos.getOrElse( val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None)) taskList += ((taskStart.taskInfo, None, None))
...@@ -132,9 +131,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -132,9 +131,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId val eid = taskEnd.taskInfo.executorId
if (!executorToTasksActive.contains(eid)) val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
executorToTasksActive(eid) = HashSet[Long]() activeTasks -= taskEnd.taskInfo
executorToTasksActive(eid) -= taskEnd.taskInfo.taskId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match { taskEnd.reason match {
case e: ExceptionFailure => case e: ExceptionFailure =>
...@@ -142,7 +140,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -142,7 +140,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
(Some(e), e.metrics) (Some(e), e.metrics)
case _ => case _ =>
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Some(taskEnd.taskMetrics)) (None, Option(taskEnd.taskMetrics))
} }
val taskList = executorToTaskInfos.getOrElse( val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
......
...@@ -25,9 +25,10 @@ import scala.Some ...@@ -25,9 +25,10 @@ import scala.Some
import scala.xml.{NodeSeq, Node} import scala.xml.{NodeSeq, Node}
import spark.scheduler.Stage import spark.scheduler.Stage
import spark.ui.UIUtils._
import spark.ui.Page._
import spark.storage.StorageLevel import spark.storage.StorageLevel
import spark.ui.Page._
import spark.ui.UIUtils._
import spark.Utils
/** Page showing list of all ongoing and recently finished stages */ /** Page showing list of all ongoing and recently finished stages */
private[spark] class IndexPage(parent: JobProgressUI) { private[spark] class IndexPage(parent: JobProgressUI) {
...@@ -38,6 +39,12 @@ private[spark] class IndexPage(parent: JobProgressUI) { ...@@ -38,6 +39,12 @@ private[spark] class IndexPage(parent: JobProgressUI) {
val activeStages = listener.activeStages.toSeq val activeStages = listener.activeStages.toSeq
val completedStages = listener.completedStages.reverse.toSeq val completedStages = listener.completedStages.reverse.toSeq
val failedStages = listener.failedStages.reverse.toSeq val failedStages = listener.failedStages.reverse.toSeq
val now = System.currentTimeMillis()
var activeTime = 0L
for (tasks <- listener.stageToTasksActive.values; t <- tasks) {
activeTime += t.timeRunning(now)
}
/** Special table which merges two header cells. */ /** Special table which merges two header cells. */
def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
...@@ -48,7 +55,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { ...@@ -48,7 +55,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
<th>Submitted</th> <th>Submitted</th>
<th>Duration</th> <th>Duration</th>
<th colspan="2">Tasks: Complete/Total</th> <th colspan="2">Tasks: Complete/Total</th>
<th>Shuffle Activity</th> <th>Shuffle Read</th>
<th>Shuffle Write</th>
<th>Stored RDD</th> <th>Stored RDD</th>
</thead> </thead>
<tbody> <tbody>
...@@ -57,11 +65,33 @@ private[spark] class IndexPage(parent: JobProgressUI) { ...@@ -57,11 +65,33 @@ private[spark] class IndexPage(parent: JobProgressUI) {
</table> </table>
} }
val summary: NodeSeq =
<div>
<ul class="unstyled">
<li>
<strong>CPU time: </strong>
{parent.formatDuration(listener.totalTime + activeTime)}
</li>
{if (listener.totalShuffleRead > 0)
<li>
<strong>Shuffle read: </strong>
{Utils.memoryBytesToString(listener.totalShuffleRead)}
</li>
}
{if (listener.totalShuffleWrite > 0)
<li>
<strong>Shuffle write: </strong>
{Utils.memoryBytesToString(listener.totalShuffleWrite)}
</li>
}
</ul>
</div>
val activeStageTable: NodeSeq = stageTable(stageRow, activeStages) val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
val completedStageTable = stageTable(stageRow, completedStages) val completedStageTable = stageTable(stageRow, completedStages)
val failedStageTable: NodeSeq = stageTable(stageRow, failedStages) val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
val content = <h2>Active Stages</h2> ++ activeStageTable ++ val content = summary ++
<h2>Active Stages</h2> ++ activeStageTable ++
<h2>Completed Stages</h2> ++ completedStageTable ++ <h2>Completed Stages</h2> ++ completedStageTable ++
<h2>Failed Stages</h2> ++ failedStageTable <h2>Failed Stages</h2> ++ failedStageTable
...@@ -91,14 +121,16 @@ private[spark] class IndexPage(parent: JobProgressUI) { ...@@ -91,14 +121,16 @@ private[spark] class IndexPage(parent: JobProgressUI) {
case Some(t) => dateFmt.format(new Date(t)) case Some(t) => dateFmt.format(new Date(t))
case None => "Unknown" case None => "Unknown"
} }
val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id))
val shuffleInfo = (read, write) match { val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
case (true, true) => "Read/Write" case 0 => ""
case (true, false) => "Read" case b => Utils.memoryBytesToString(b)
case (false, true) => "Write" }
case _ => "" val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
case 0 => ""
case b => Utils.memoryBytesToString(b)
} }
val startedTasks = listener.stageToTasksActive.getOrElse(s.id, Seq[Long]()).size
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions val totalTasks = s.numPartitions
...@@ -115,7 +147,8 @@ private[spark] class IndexPage(parent: JobProgressUI) { ...@@ -115,7 +147,8 @@ private[spark] class IndexPage(parent: JobProgressUI) {
case _ => case _ =>
}} }}
</td> </td>
<td>{shuffleInfo}</td> <td>{shuffleRead}</td>
<td>{shuffleWrite}</td>
<td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) { <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
<a href={"/storage/rdd?id=%s".format(s.rdd.id)}> <a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
{Option(s.rdd.name).getOrElse(s.rdd.id)} {Option(s.rdd.name).getOrElse(s.rdd.id)}
......
...@@ -65,7 +65,15 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -65,7 +65,15 @@ private[spark] class JobProgressListener extends SparkListener {
val completedStages = ListBuffer[Stage]() val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]() val failedStages = ListBuffer[Stage]()
val stageToTasksActive = HashMap[Int, HashSet[Long]]() // Total metrics reflect metrics only for completed tasks
var totalTime = 0L
var totalShuffleRead = 0L
var totalShuffleWrite = 0L
val stageToTime = HashMap[Int, Long]()
val stageToShuffleRead = HashMap[Int, Long]()
val stageToShuffleWrite = HashMap[Int, Long]()
val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
val stageToTasksComplete = HashMap[Int, Int]() val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]() val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos = val stageToTaskInfos =
...@@ -86,6 +94,12 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -86,6 +94,12 @@ private[spark] class JobProgressListener extends SparkListener {
val toRemove = RETAINED_STAGES / 10 val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => { stages.takeRight(toRemove).foreach( s => {
stageToTaskInfos.remove(s.id) stageToTaskInfos.remove(s.id)
stageToTime.remove(s.id)
stageToShuffleRead.remove(s.id)
stageToShuffleWrite.remove(s.id)
stageToTasksActive.remove(s.id)
stageToTasksComplete.remove(s.id)
stageToTasksFailed.remove(s.id)
}) })
stages.trimEnd(toRemove) stages.trimEnd(toRemove)
} }
...@@ -96,9 +110,8 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -96,9 +110,8 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) { override def onTaskStart(taskStart: SparkListenerTaskStart) {
val sid = taskStart.task.stageId val sid = taskStart.task.stageId
if (!stageToTasksActive.contains(sid)) val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
stageToTasksActive(sid) = HashSet[Long]() tasksActive += taskStart.taskInfo
stageToTasksActive(sid) += taskStart.taskInfo.taskId
val taskList = stageToTaskInfos.getOrElse( val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None)) taskList += ((taskStart.taskInfo, None, None))
...@@ -107,9 +120,8 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -107,9 +120,8 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId val sid = taskEnd.task.stageId
if (!stageToTasksActive.contains(sid)) val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
stageToTasksActive(sid) = HashSet[Long]() tasksActive -= taskEnd.taskInfo
stageToTasksActive(sid) -= taskEnd.taskInfo.taskId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match { taskEnd.reason match {
case e: ExceptionFailure => case e: ExceptionFailure =>
...@@ -117,8 +129,26 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -117,8 +129,26 @@ private[spark] class JobProgressListener extends SparkListener {
(Some(e), e.metrics) (Some(e), e.metrics)
case _ => case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
(None, Some(taskEnd.taskMetrics)) (None, Option(taskEnd.taskMetrics))
} }
stageToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(m => m.executorRunTime).getOrElse(0)
stageToTime(sid) += time
totalTime += time
stageToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
s.remoteBytesRead).getOrElse(0L)
stageToShuffleRead(sid) += shuffleRead
totalShuffleRead += shuffleRead
stageToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
s.shuffleBytesWritten).getOrElse(0L)
stageToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse( val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None)) taskList -= ((taskEnd.taskInfo, None, None))
...@@ -139,22 +169,4 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -139,22 +169,4 @@ private[spark] class JobProgressListener extends SparkListener {
case _ => case _ =>
} }
} }
/** Is this stage's input from a shuffle read. */
def hasShuffleRead(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
}
return false // No tasks have finished for this stage
}
/** Is this stage's output to a shuffle write. */
def hasShuffleWrite(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
}
return false // No tasks have finished for this stage
}
} }
...@@ -37,6 +37,7 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -37,6 +37,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
def render(request: HttpServletRequest): Seq[Node] = { def render(request: HttpServletRequest): Seq[Node] = {
val stageId = request.getParameter("id").toInt val stageId = request.getParameter("id").toInt
val now = System.currentTimeMillis()
if (!listener.stageToTaskInfos.contains(stageId)) { if (!listener.stageToTaskInfos.contains(stageId)) {
val content = val content =
...@@ -49,8 +50,35 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -49,8 +50,35 @@ private[spark] class StagePage(parent: JobProgressUI) {
val tasks = listener.stageToTaskInfos(stageId) val tasks = listener.stageToTaskInfos(stageId)
val shuffleRead = listener.hasShuffleRead(stageId) val shuffleRead = listener.stageToShuffleRead(stageId) > 0
val shuffleWrite = listener.hasShuffleWrite(stageId) val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0
var activeTime = 0L
listener.stageToTasksActive(stageId).foreach { t =>
activeTime += t.timeRunning(now)
}
val summary =
<div>
<ul class="unstyled">
<li>
<strong>CPU time: </strong>
{parent.formatDuration(listener.stageToTime(stageId) + activeTime)}
</li>
{if (shuffleRead)
<li>
<strong>Shuffle read: </strong>
{Utils.memoryBytesToString(listener.stageToShuffleRead(stageId))}
</li>
}
{if (shuffleWrite)
<li>
<strong>Shuffle write: </strong>
{Utils.memoryBytesToString(listener.stageToShuffleWrite(stageId))}
</li>
}
</ul>
</div>
val taskHeaders: Seq[String] = val taskHeaders: Seq[String] =
Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++ Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
...@@ -98,7 +126,8 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -98,7 +126,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
} }
val content = val content =
<h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable; summary ++ <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++
<h2>Tasks</h2> ++ taskTable;
headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment