diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index dd903dc65d204c1444659ce1f191dfb1b6e7721b..acae448a9c66f7f7a756b5ca6b4063d41f34a09d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -360,7 +360,16 @@ private[spark] class Executor(
             if (!taskRunner.attemptedTask.isEmpty) {
               Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
                 metrics.updateShuffleReadMetrics
-                tasksMetrics += ((taskRunner.taskId, metrics))
+                if (isLocal) {
+                  // JobProgressListener will hold an reference of it during
+                  // onExecutorMetricsUpdate(), then JobProgressListener can not see
+                  // the changes of metrics any more, so make a deep copy of it
+                  val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
+                  tasksMetrics += ((taskRunner.taskId, copiedMetrics))
+                } else {
+                  // It will be copied by serialization
+                  tasksMetrics += ((taskRunner.taskId, metrics))
+                }
               }
             }
           }