Skip to content
Snippets Groups Projects
Commit abc78cd3 authored by Karen Feng's avatar Karen Feng
Browse files

Modifies instead of copies HashSets, fixes comment style

parent 383684da
No related branches found
No related tags found
No related merge requests found
......@@ -52,7 +52,7 @@ class DAGScheduler(
}
taskSched.setListener(this)
//Called by TaskScheduler to report task's starting.
// Called by TaskScheduler to report task's starting.
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventQueue.put(BeginEvent(task, taskInfo))
}
......
......@@ -121,14 +121,16 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, HashSet[Long]()) +
taskStart.taskInfo.taskId
if (!executorToTasksActive.contains(eid))
executorToTasksActive(eid) = HashSet[Long]()
executorToTasksActive(eid) += taskStart.taskInfo.taskId
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, HashSet[Long]()) -
taskEnd.taskInfo.taskId
if (!executorToTasksActive.contains(eid))
executorToTasksActive(eid) = HashSet[Long]()
executorToTasksActive(eid) -= taskStart.taskInfo.taskId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment