From 7920296bf8f313e010205937d3ebcbbc7b1a1d9e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh <simonh@tw.ibm.com> Date: Sun, 22 May 2016 08:08:46 -0500 Subject: [PATCH] [SPARK-15430][SQL] Fix potential ConcurrentModificationException for ListAccumulator ## What changes were proposed in this pull request? In `ListAccumulator` we create an unmodifiable view for underlying list. However, it doesn't prevent the underlying to be modified further. So as we access the unmodifiable list, the underlying list can be modified in the same time. It could cause `java.util.ConcurrentModificationException`. We can observe such exception in recent tests. To fix it, we can copy a list of the underlying list and then create the unmodifiable view of this list instead. ## How was this patch tested? The exception might be difficult to test. Existing tests should be passed. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #13211 from viirya/fix-concurrentmodify. --- .../main/scala/org/apache/spark/util/AccumulatorV2.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 13cb6a28c3..21ba46024d 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.{lang => jl} import java.io.ObjectInputStream +import java.util.ArrayList import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong @@ -415,7 +416,7 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] { class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { - private val _list: java.util.List[T] = new java.util.ArrayList[T] + private val _list: java.util.List[T] = new ArrayList[T] override def isZero: Boolean = _list.isEmpty @@ -437,7 +438,9 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] { s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}") } - override def value: java.util.List[T] = java.util.Collections.unmodifiableList(_list) + override def value: java.util.List[T] = _list.synchronized { + java.util.Collections.unmodifiableList(new ArrayList[T](_list)) + } private[spark] def setValue(newValue: java.util.List[T]): Unit = { _list.clear() -- GitLab