diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b2aa8bfbe70094fe7932414dac1c563e5085a97b..2516b674fea855c9b69372251ececcdb7c537f23 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -374,28 +374,34 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { execSummary.taskTime += info.duration stageData.numActiveTasks -= 1 - val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) = + val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) = taskEnd.reason match { case org.apache.spark.Success => stageData.completedIndices.add(info.index) stageData.numCompleteTasks += 1 - (None, Option(taskEnd.taskMetrics)) - case e: ExceptionFailure => // Handle ExceptionFailure because we might have metrics + (None, taskEnd.taskMetrics.accumulatorUpdates()) + case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates stageData.numFailedTasks += 1 - (Some(e.toErrorString), e.metrics) - case e: TaskFailedReason => // All other failure cases + (Some(e.toErrorString), e.accumUpdates) + case e: TaskFailedReason => // All other failure cases stageData.numFailedTasks += 1 - (Some(e.toErrorString), None) + (Some(e.toErrorString), Seq.empty[AccumulableInfo]) } - metrics.foreach { m => + val taskMetrics = + if (accums.nonEmpty) { + Some(TaskMetrics.fromAccumulatorUpdates(accums)) + } else { + None + } + taskMetrics.foreach { m => val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics) updateAggregateMetrics(stageData, info.executorId, m, oldMetrics) } val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info)) taskData.taskInfo = info - taskData.taskMetrics = metrics + taskData.taskMetrics = taskMetrics taskData.errorMessage = errorMessage for (