Skip to content
Snippets Groups Projects
Commit 5a93e3c5 authored by Karen Feng's avatar Karen Feng
Browse files

Cleaned up code based on pwendell's suggestions

parent dcc4743a
No related branches found
No related tags found
No related merge requests found
...@@ -121,10 +121,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -121,10 +121,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskStart(taskStart: SparkListenerTaskStart) { override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId val eid = taskStart.taskInfo.executorId
if (!executorToTasksActive.contains(eid)) { val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
executorToTasksActive(eid) = HashSet[TaskInfo]() activeTasks += taskStart.taskInfo
}
executorToTasksActive(eid) += taskStart.taskInfo
val taskList = executorToTaskInfos.getOrElse( val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None)) taskList += ((taskStart.taskInfo, None, None))
...@@ -133,10 +131,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -133,10 +131,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId val eid = taskEnd.taskInfo.executorId
if (!executorToTasksActive.contains(eid)) { val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
executorToTasksActive(eid) = HashSet[TaskInfo]() activeTasks -= taskStart.taskInfo
}
executorToTasksActive(eid) -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match { taskEnd.reason match {
case e: ExceptionFailure => case e: ExceptionFailure =>
......
...@@ -125,16 +125,14 @@ private[spark] class IndexPage(parent: JobProgressUI) { ...@@ -125,16 +125,14 @@ private[spark] class IndexPage(parent: JobProgressUI) {
case None => "Unknown" case None => "Unknown"
} }
val shuffleRead = val shuffleRead = listener.stageToShuffleRead(s.id) match {
if (!listener.hasShuffleRead(s.id)) case 0 => ""
"" case b => Utils.memoryBytesToString(b)
else }
Utils.memoryBytesToString(listener.stageToShuffleRead(s.id)) val shuffleWrite = listener.stageToShuffleWrite(s.id) match {
val shuffleWrite = case 0 => ""
if (!listener.hasShuffleWrite(s.id)) case b => Utils.memoryBytesToString(b)
"" }
else
Utils.memoryBytesToString(listener.stageToShuffleWrite(s.id))
val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
val totalTasks = s.numPartitions val totalTasks = s.numPartitions
......
...@@ -65,6 +65,7 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -65,6 +65,7 @@ private[spark] class JobProgressListener extends SparkListener {
val completedStages = ListBuffer[Stage]() val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]() val failedStages = ListBuffer[Stage]()
// Total metrics reflect metrics only for completed tasks
var totalTime = 0L var totalTime = 0L
var totalShuffleRead = 0L var totalShuffleRead = 0L
var totalShuffleWrite = 0L var totalShuffleWrite = 0L
...@@ -109,10 +110,8 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -109,10 +110,8 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) { override def onTaskStart(taskStart: SparkListenerTaskStart) {
val sid = taskStart.task.stageId val sid = taskStart.task.stageId
if (!stageToTasksActive.contains(sid)) { val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
stageToTasksActive(sid) = HashSet[TaskInfo]() tasksActive += taskStart.taskInfo
}
stageToTasksActive(sid) += taskStart.taskInfo
val taskList = stageToTaskInfos.getOrElse( val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None)) taskList += ((taskStart.taskInfo, None, None))
...@@ -121,10 +120,8 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -121,10 +120,8 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId val sid = taskEnd.task.stageId
if (!stageToTasksActive.contains(sid)) { val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
stageToTasksActive(sid) = HashSet[TaskInfo]() tasksActive -= taskEnd.taskInfo
}
stageToTasksActive(sid) -= taskEnd.taskInfo
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match { taskEnd.reason match {
case e: ExceptionFailure => case e: ExceptionFailure =>
...@@ -135,24 +132,18 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -135,24 +132,18 @@ private[spark] class JobProgressListener extends SparkListener {
(None, Option(taskEnd.taskMetrics)) (None, Option(taskEnd.taskMetrics))
} }
if (!stageToTime.contains(sid)) { stageToTime.getOrElseUpdate(sid, 0L)
stageToTime(sid) = 0L
}
val time = metrics.map(m => m.executorRunTime).getOrElse(0) val time = metrics.map(m => m.executorRunTime).getOrElse(0)
stageToTime(sid) += time stageToTime(sid) += time
totalTime += time totalTime += time
if (!stageToShuffleRead.contains(sid)) { stageToShuffleRead.getOrElseUpdate(sid, 0L)
stageToShuffleRead(sid) = 0L
}
val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
s.remoteBytesRead).getOrElse(0L) s.remoteBytesRead).getOrElse(0L)
stageToShuffleRead(sid) += shuffleRead stageToShuffleRead(sid) += shuffleRead
totalShuffleRead += shuffleRead totalShuffleRead += shuffleRead
if (!stageToShuffleWrite.contains(sid)) { stageToShuffleWrite.getOrElseUpdate(sid, 0L)
stageToShuffleWrite(sid) = 0L
}
val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
s.shuffleBytesWritten).getOrElse(0L) s.shuffleBytesWritten).getOrElse(0L)
stageToShuffleWrite(sid) += shuffleWrite stageToShuffleWrite(sid) += shuffleWrite
...@@ -178,22 +169,4 @@ private[spark] class JobProgressListener extends SparkListener { ...@@ -178,22 +169,4 @@ private[spark] class JobProgressListener extends SparkListener {
case _ => case _ =>
} }
} }
/** Is this stage's input from a shuffle read. */
def hasShuffleRead(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
}
return false // No tasks have finished for this stage
}
/** Is this stage's output to a shuffle write. */
def hasShuffleWrite(stageID: Int): Boolean = {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
}
return false // No tasks have finished for this stage
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment