Skip to content
Snippets Groups Projects
Commit 42904b8d authored by Davies Liu's avatar Davies Liu Committed by Andrew Or
Browse files

[SPARK-3465] fix task metrics aggregation in local mode

Before overwrite t.taskMetrics, take a deepcopy of it.

Author: Davies Liu <davies.liu@gmail.com>

Closes #2338 from davies/fix_metric and squashes the following commits:

a5cdb63 [Davies Liu] Merge branch 'master' into fix_metric
7c879e0 [Davies Liu] add more comments
754b5b8 [Davies Liu] copy taskMetrics only when isLocal is true
5ca26dc [Davies Liu] fix task metrics aggregation in local mode
parent 33c7a738
No related branches found
No related tags found
No related merge requests found
......@@ -360,7 +360,16 @@ private[spark] class Executor(
if (!taskRunner.attemptedTask.isEmpty) {
Option(taskRunner.task).flatMap(_.metrics).foreach { metrics =>
metrics.updateShuffleReadMetrics
tasksMetrics += ((taskRunner.taskId, metrics))
if (isLocal) {
// JobProgressListener will hold an reference of it during
// onExecutorMetricsUpdate(), then JobProgressListener can not see
// the changes of metrics any more, so make a deep copy of it
val copiedMetrics = Utils.deserialize[TaskMetrics](Utils.serialize(metrics))
tasksMetrics += ((taskRunner.taskId, copiedMetrics))
} else {
// It will be copied by serialization
tasksMetrics += ((taskRunner.taskId, metrics))
}
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment