Skip to content
Snippets Groups Projects
Commit e1aaab1e authored by Wenchen Fan's avatar Wenchen Fan Committed by Marcelo Vanzin
Browse files

[SPARK-12837][SPARK-20666][CORE][FOLLOWUP] getting name should not fail if...

[SPARK-12837][SPARK-20666][CORE][FOLLOWUP] getting name should not fail if accumulator is garbage collected

## What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/17596 , we do not send internal accumulator name to executor side anymore, and always look up the accumulator name in `AccumulatorContext`.

This cause a regression if the accumulator is already garbage collected, this PR fixes this by still sending accumulator name for `SQLMetrics`.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #17931 from cloud-fan/bug.
parent 9970aa09
No related branches found
No related tags found
No related merge requests found
......@@ -84,10 +84,11 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
* Returns the name of this accumulator, can only be called after registration.
*/
final def name: Option[String] = {
assertMetadataNotNull()
if (atDriverSide) {
AccumulatorContext.get(id).flatMap(_.metadata.name)
metadata.name.orElse(AccumulatorContext.get(id).flatMap(_.metadata.name))
} else {
assertMetadataNotNull()
metadata.name
}
}
......@@ -165,13 +166,15 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
}
val copyAcc = copyAndReset()
assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
val isInternalAcc =
(name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)) ||
getClass.getSimpleName == "SQLMetric"
val isInternalAcc = name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)
if (isInternalAcc) {
// Do not serialize the name of internal accumulator and send it to executor.
copyAcc.metadata = metadata.copy(name = None)
} else {
// For non-internal accumulators, we still need to send the name because users may need to
// access the accumulator name at executor side, or they may keep the accumulators sent from
// executors and access the name when the registered accumulator is already garbage
// collected(e.g. SQLMetrics).
copyAcc.metadata = metadata
}
copyAcc
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment