Skip to content
Snippets Groups Projects
Commit 5133e4be authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #790 from kayousterhout/fix_throughput

Fixed issue in UI that decreased scheduler throughput by 5x or more
parents 3c8478e1 b88e2624
No related branches found
No related tags found
No related merge requests found
...@@ -97,7 +97,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -97,7 +97,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
.getOrElse(0).toString .getOrElse(0).toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString val totalTasks = activeTasks + failedTasks + completedTasks
Seq( Seq(
execId, execId,
...@@ -117,17 +117,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -117,17 +117,11 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]() val executorToTasksActive = HashMap[String, HashSet[TaskInfo]]()
val executorToTasksComplete = HashMap[String, Int]() val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]() val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onTaskStart(taskStart: SparkListenerTaskStart) { override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId val eid = taskStart.taskInfo.executorId
val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]()) val activeTasks = executorToTasksActive.getOrElseUpdate(eid, new HashSet[TaskInfo]())
activeTasks += taskStart.taskInfo activeTasks += taskStart.taskInfo
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
executorToTaskInfos(eid) = taskList
} }
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
...@@ -143,11 +137,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) { ...@@ -143,11 +137,6 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Option(taskEnd.taskMetrics)) (None, Option(taskEnd.taskMetrics))
} }
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
executorToTaskInfos(eid) = taskList
} }
} }
} }
package spark.ui.jobs package spark.ui.jobs
import scala.Seq import scala.Seq
import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
import spark.{ExceptionFailure, SparkContext, Success, Utils} import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._ import spark.scheduler._
...@@ -34,7 +34,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList ...@@ -34,7 +34,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val stageToTasksComplete = HashMap[Int, Int]() val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]() val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos = val stageToTaskInfos =
HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() HashMap[Int, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {} override def onJobStart(jobStart: SparkListenerJobStart) {}
...@@ -89,7 +89,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList ...@@ -89,7 +89,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
tasksActive += taskStart.taskInfo tasksActive += taskStart.taskInfo
val taskList = stageToTaskInfos.getOrElse( val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None)) taskList += ((taskStart.taskInfo, None, None))
stageToTaskInfos(sid) = taskList stageToTaskInfos(sid) = taskList
} }
...@@ -126,7 +126,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList ...@@ -126,7 +126,7 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
totalShuffleWrite += shuffleWrite totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse( val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None)) taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo)) taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList stageToTaskInfos(sid) = taskList
......
...@@ -48,7 +48,7 @@ private[spark] class StagePage(parent: JobProgressUI) { ...@@ -48,7 +48,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
} }
val tasks = listener.stageToTaskInfos(stageId) val tasks = listener.stageToTaskInfos(stageId).toSeq
val shuffleRead = listener.stageToShuffleRead(stageId) > 0 val shuffleRead = listener.stageToShuffleRead(stageId) > 0
val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0 val shuffleWrite = listener.stageToShuffleWrite(stageId) > 0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment