From 983fa2d62029e7334fb661cb65c8cadaa4b86d1c Mon Sep 17 00:00:00 2001 From: Josh Rosen <joshrosen@databricks.com> Date: Fri, 19 Feb 2016 15:57:23 -0800 Subject: [PATCH] [SPARK-13407] Guard against garbage-collected accumulators in TaskMetrics.fromAccumulatorUpdates `TaskMetrics.fromAccumulatorUpdates()` can fail if accumulators have been garbage-collected on the driver. To guard against this, this patch introduces `ListenerTaskMetrics`, a subclass of `TaskMetrics` which is used only in `TaskMetrics.fromAccumulatorUpdates()` and which eliminates the need to access the original accumulators on the driver. Author: Josh Rosen <joshrosen@databricks.com> Closes #11276 from JoshRosen/accum-updates-fix. --- .../apache/spark/executor/TaskMetrics.scala | 55 ++++++++++--------- .../spark/executor/TaskMetricsSuite.scala | 10 ++-- 2 files changed, 33 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 8ff0620f83..9da9cb5940 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -364,6 +364,27 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se } +/** + * Internal subclass of [[TaskMetrics]] which is used only for posting events to listeners. + * Its purpose is to obviate the need for the driver to reconstruct the original accumulators, + * which might have been garbage-collected. See SPARK-13407 for more details. + * + * Instances of this class should be considered read-only and users should not call `inc*()` or + * `set*()` methods. While we could override the setter methods to throw + * UnsupportedOperationException, we choose not to do so because the overrides would quickly become + * out-of-date when new metrics are added. + */ +private[spark] class ListenerTaskMetrics( + initialAccums: Seq[Accumulator[_]], + accumUpdates: Seq[AccumulableInfo]) extends TaskMetrics(initialAccums) { + + override def accumulatorUpdates(): Seq[AccumulableInfo] = accumUpdates + + override private[spark] def registerAccumulator(a: Accumulable[_, _]): Unit = { + throw new UnsupportedOperationException("This TaskMetrics is read-only") + } +} + private[spark] object TaskMetrics extends Logging { def empty: TaskMetrics = new TaskMetrics @@ -397,33 +418,15 @@ private[spark] object TaskMetrics extends Logging { // Initial accumulators are passed into the TaskMetrics constructor first because these // are required to be uniquely named. The rest of the accumulators from this task are // registered later because they need not satisfy this requirement. - val (initialAccumInfos, otherAccumInfos) = accumUpdates - .filter { info => info.update.isDefined } - .partition { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) } - val initialAccums = initialAccumInfos.map { info => - val accum = InternalAccumulator.create(info.name.get) - accum.setValueAny(info.update.get) - accum - } - // We don't know the types of the rest of the accumulators, so we try to find the same ones - // that were previously registered here on the driver and make copies of them. It is important - // that we copy the accumulators here since they are used across many tasks and we want to - // maintain a snapshot of their local task values when we post them to listeners downstream. - val otherAccums = otherAccumInfos.flatMap { info => - val id = info.id - val acc = Accumulators.get(id).map { a => - val newAcc = a.copy() - newAcc.setValueAny(info.update.get) - newAcc + val definedAccumUpdates = accumUpdates.filter { info => info.update.isDefined } + val initialAccums = definedAccumUpdates + .filter { info => info.name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX)) } + .map { info => + val accum = InternalAccumulator.create(info.name.get) + accum.setValueAny(info.update.get) + accum } - if (acc.isEmpty) { - logWarning(s"encountered unregistered accumulator $id when reconstructing task metrics.") - } - acc - } - val metrics = new TaskMetrics(initialAccums) - otherAccums.foreach(metrics.registerAccumulator) - metrics + new ListenerTaskMetrics(initialAccums, definedAccumUpdates) } } diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala index 3a1a67cdc0..d91f50f18f 100644 --- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala @@ -475,10 +475,9 @@ class TaskMetricsSuite extends SparkFunSuite { } val metrics1 = TaskMetrics.fromAccumulatorUpdates(accumUpdates1) assertUpdatesEquals(metrics1.accumulatorUpdates(), accumUpdates1) - // Test this with additional accumulators. Only the ones registered with `Accumulators` - // will show up in the reconstructed TaskMetrics. In practice, all accumulators created + // Test this with additional accumulators to ensure that we do not crash when handling + // updates from unregistered accumulators. In practice, all accumulators created // on the driver, internal or not, should be registered with `Accumulators` at some point. - // Here we show that reconstruction will succeed even if there are unregistered accumulators. val param = IntAccumulatorParam val registeredAccums = Seq( new Accumulator(0, param, Some("a"), internal = true, countFailedValues = true), @@ -497,9 +496,8 @@ class TaskMetricsSuite extends SparkFunSuite { val registeredAccumInfos = registeredAccums.map(makeInfo) val unregisteredAccumInfos = unregisteredAccums.map(makeInfo) val accumUpdates2 = accumUpdates1 ++ registeredAccumInfos ++ unregisteredAccumInfos - val metrics2 = TaskMetrics.fromAccumulatorUpdates(accumUpdates2) - // accumulators that were not registered with `Accumulators` will not show up - assertUpdatesEquals(metrics2.accumulatorUpdates(), accumUpdates1 ++ registeredAccumInfos) + // Simply checking that this does not crash: + TaskMetrics.fromAccumulatorUpdates(accumUpdates2) } } -- GitLab