From 56d88247f14ca54750816748f5b6b2aca7bc6fea Mon Sep 17 00:00:00 2001
From: GayathriMurali <gayathri.m.softie@gmail.com>
Date: Wed, 16 Mar 2016 09:38:41 +0000
Subject: [PATCH] =?UTF-8?q?[SPARK-13396]=20Stop=20using=20our=20internal?=
 =?UTF-8?q?=20deprecated=20.metrics=20on=20Exceptio=E2=80=A6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

JIRA: https://issues.apache.org/jira/browse/SPARK-13396

Stop using our internal deprecated .metrics on ExceptionFailure instead use accumUpdates

Author: GayathriMurali <gayathri.m.softie@gmail.com>

Closes #11544 from GayathriMurali/SPARK-13396.
---
 .../spark/ui/jobs/JobProgressListener.scala   | 22 ++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index b2aa8bfbe7..2516b674fe 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -374,28 +374,34 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       execSummary.taskTime += info.duration
       stageData.numActiveTasks -= 1
 
-      val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
+      val (errorMessage, accums): (Option[String], Seq[AccumulableInfo]) =
         taskEnd.reason match {
           case org.apache.spark.Success =>
             stageData.completedIndices.add(info.index)
             stageData.numCompleteTasks += 1
-            (None, Option(taskEnd.taskMetrics))
-          case e: ExceptionFailure =>  // Handle ExceptionFailure because we might have metrics
+            (None, taskEnd.taskMetrics.accumulatorUpdates())
+          case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates
             stageData.numFailedTasks += 1
-            (Some(e.toErrorString), e.metrics)
-          case e: TaskFailedReason =>  // All other failure cases
+            (Some(e.toErrorString), e.accumUpdates)
+          case e: TaskFailedReason => // All other failure cases
             stageData.numFailedTasks += 1
-            (Some(e.toErrorString), None)
+            (Some(e.toErrorString), Seq.empty[AccumulableInfo])
         }
 
-      metrics.foreach { m =>
+      val taskMetrics =
+        if (accums.nonEmpty) {
+          Some(TaskMetrics.fromAccumulatorUpdates(accums))
+        } else {
+          None
+        }
+      taskMetrics.foreach { m =>
         val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics)
         updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
       }
 
       val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
       taskData.taskInfo = info
-      taskData.taskMetrics = metrics
+      taskData.taskMetrics = taskMetrics
       taskData.errorMessage = errorMessage
 
       for (
-- 
GitLab