Skip to content
Snippets Groups Projects
Commit 9f2dbb2a authored by Karen Feng's avatar Karen Feng
Browse files

Adds/removes active tasks only once

parent 0200801a
No related branches found
No related tags found
No related merge requests found
......@@ -94,7 +94,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, 0).toString
val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, Seq[Long]()).size.toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
......@@ -114,7 +114,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
val executorToTasksActive = HashMap[String, Int]()
val executorToTasksActive = HashMap[String, Seq[Long]]()
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
......@@ -122,12 +122,12 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()) :+ taskStart.taskInfo.taskId
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) - 1
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, Seq[Long]()).filterNot(_ == taskEnd.taskInfo.taskId)
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment