From 33bae585d4cb25aed2ac32e0d1248f78cc65318b Mon Sep 17 00:00:00 2001
From: Carson Wang <carson.wang@intel.com>
Date: Fri, 14 Aug 2015 13:38:25 -0700
Subject: [PATCH] [SPARK-9809] Task crashes because the internal accumulators
 are not properly initialized

When a stage failed and another stage was resubmitted with only part of partitions to compute, all the tasks failed with error message: java.util.NoSuchElementException: key not found: peakExecutionMemory.
This is because the internal accumulators are not properly initialized for this stage while other codes assume the internal accumulators always exist.

Author: Carson Wang <carson.wang@intel.com>

Closes #8090 from carsonwang/SPARK-9809.
---
 .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7ab5ccf50a..f1c63d0876 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -790,9 +790,10 @@ class DAGScheduler(
       }
     }
 
+    // Create internal accumulators if the stage has no accumulators initialized.
     // Reset internal accumulators only if this stage is not partially submitted
     // Otherwise, we may override existing accumulator values from some tasks
-    if (allPartitions == partitionsToCompute) {
+    if (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) {
       stage.resetInternalAccumulators()
     }
 
-- 
GitLab