Skip to content
Snippets Groups Projects
Commit d989434e authored by Carson Wang's avatar Carson Wang Committed by Wenchen Fan
Browse files

[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to …

[SPARK-19674][SQL] Ignore driver accumulator updates don't belong to the execution when merging all accumulator updates

N.B. This is a backport to branch-2.1 of #17009.

## What changes were proposed in this pull request?
In SQLListener.getExecutionMetrics, driver accumulator updates don't belong to the execution should be ignored when merging all accumulator updates to prevent NoSuchElementException.

## How was this patch tested?
Updated unit test.

Author: Carson Wang <carson.wangintel.com>

Author: Carson Wang <carson.wang@intel.com>

Closes #17418 from mallman/spark-19674-backport_2.1.
parent 92f0b012
No related branches found
No related tags found
No related merge requests found
......@@ -343,10 +343,13 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging {
accumulatorUpdate <- taskMetrics.accumulatorUpdates) yield {
(accumulatorUpdate._1, accumulatorUpdate._2)
}
}.filter { case (id, _) => executionUIData.accumulatorMetrics.contains(id) }
}
val driverUpdates = executionUIData.driverAccumUpdates.toSeq
mergeAccumulatorUpdates(accumulatorUpdates ++ driverUpdates, accumulatorId =>
val totalUpdates = (accumulatorUpdates ++ driverUpdates).filter {
case (id, _) => executionUIData.accumulatorMetrics.contains(id)
}
mergeAccumulatorUpdates(totalUpdates, accumulatorId =>
executionUIData.accumulatorMetrics(accumulatorId).metricType)
case None =>
// This execution has been dropped
......
......@@ -147,6 +147,11 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
// Driver accumulator updates don't belong to this execution should be filtered and no
// exception will be thrown.
listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
// (task id, stage id, stage attempt, accum updates)
(0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment