From 42904b8d013e71d03e301c3da62e33b4cc2eb54e Mon Sep 17 00:00:00 2001
From: Davies Liu <davies.liu@gmail.com>
Date: Thu, 11 Sep 2014 18:53:26 -0700
Subject: [PATCH] [SPARK-3465] fix task metrics aggregation in local mode

Before overwrite t.taskMetrics, take a deepcopy of it.

Author: Davies Liu <davies.liu@gmail.com>

Closes #2338 from davies/fix_metric and squashes the following commits:

a5cdb63 [Davies Liu] Merge branch 'master' into fix_metric
7c879e0 [Davies Liu] add more comments
754b5b8 [Davies Liu] copy taskMetrics only when isLocal is true
5ca26dc [Davies Liu] fix task metrics aggregation in local mode
---
 .../scala/org/apache/spark/executor/Executor.scala    | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index dd903dc65d..acae448a9c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -360,7 +360,16 @@ private[spark] class Executor(
             if (!taskRunner.attemptedTask.isEmpty) {
               Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
                 metrics.updateShuffleReadMetrics
-                tasksMetrics += ((taskRunner.taskId, metrics))
+                if (isLocal) {
+                  // JobProgressListener will hold an reference of it during
+                  // onExecutorMetricsUpdate(), then JobProgressListener can not see
+                  // the changes of metrics any more, so make a deep copy of it
+                  val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
+                  tasksMetrics += ((taskRunner.taskId, copiedMetrics))
+                } else {
+                  // It will be copied by serialization
+                  tasksMetrics += ((taskRunner.taskId, metrics))
+                }
               }
             }
           }
-- 
GitLab