diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7ab5ccf50adb75d9f99b379d5f101da84a7b41fa..f1c63d08766c291e1bdb4e37036e2471e63aee70 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -790,9 +790,10 @@ class DAGScheduler(
       }
     }
 
+    // Create internal accumulators if the stage has no accumulators initialized.
     // Reset internal accumulators only if this stage is not partially submitted
     // Otherwise, we may override existing accumulator values from some tasks
-    if (allPartitions == partitionsToCompute) {
+    if (stage.internalAccumulators.isEmpty || allPartitions == partitionsToCompute) {
       stage.resetInternalAccumulators()
     }