Skip to content
Snippets Groups Projects
Commit 2df5f1f0 authored by Josh Rosen's avatar Josh Rosen
Browse files

[SPARK-6075] Fix bug in that caused lost accumulator updates: do not store...

[SPARK-6075] Fix bug in that caused lost accumulator updates: do not store WeakReferences in localAccums map

This fixes a non-deterministic bug introduced in #4021 that could cause tasks' accumulator updates to be lost.  The problem is that `localAccums` should not hold weak references: after the task finishes running there won't be any strong references to these local accumulators, so they can get garbage-collected before the executor reads the `localAccums` map.  We don't need weak references here anyways, since this map is cleared at the end of each task.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #4835 from JoshRosen/SPARK-6075 and squashes the following commits:

4f4b5b2 [Josh Rosen] Remove defensive assertions that caused test failures in code unrelated to this change
120c7b0 [Josh Rosen] [SPARK-6075] Do not store WeakReferences in localAccums map
parent 643300a6
No related branches found
No related tags found
No related merge requests found
...@@ -280,15 +280,24 @@ object AccumulatorParam { ...@@ -280,15 +280,24 @@ object AccumulatorParam {
// TODO: The multi-thread support in accumulators is kind of lame; check // TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right // if there's a more intuitive way of doing it right
private[spark] object Accumulators { private[spark] object Accumulators extends Logging {
// Store a WeakReference instead of a StrongReference because this way accumulators can be /**
// appropriately garbage collected during long-running jobs and release memory * This global map holds the original accumulator objects that are created on the driver.
type WeakAcc = WeakReference[Accumulable[_, _]] * It keeps weak references to these objects so that accumulators can be garbage-collected
val originals = Map[Long, WeakAcc]() * once the RDDs and user-code that reference them are cleaned up.
val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() { */
override protected def initialValue() = Map[Long, WeakAcc]() val originals = Map[Long, WeakReference[Accumulable[_, _]]]()
/**
* This thread-local map holds per-task copies of accumulators; it is used to collect the set
* of accumulator updates to send back to the driver when tasks complete. After tasks complete,
* this map is cleared by `Accumulators.clear()` (see Executor.scala).
*/
private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
} }
var lastId: Long = 0
private var lastId: Long = 0
def newId(): Long = synchronized { def newId(): Long = synchronized {
lastId += 1 lastId += 1
...@@ -297,16 +306,16 @@ private[spark] object Accumulators { ...@@ -297,16 +306,16 @@ private[spark] object Accumulators {
def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized { def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
if (original) { if (original) {
originals(a.id) = new WeakAcc(a) originals(a.id) = new WeakReference[Accumulable[_, _]](a)
} else { } else {
localAccums.get()(a.id) = new WeakAcc(a) localAccums.get()(a.id) = a
} }
} }
// Clear the local (non-original) accumulators for the current thread // Clear the local (non-original) accumulators for the current thread
def clear() { def clear() {
synchronized { synchronized {
localAccums.get.clear localAccums.get.clear()
} }
} }
...@@ -320,12 +329,7 @@ private[spark] object Accumulators { ...@@ -320,12 +329,7 @@ private[spark] object Accumulators {
def values: Map[Long, Any] = synchronized { def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]() val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.get) { for ((id, accum) <- localAccums.get) {
// Since we are now storing weak references, we must check whether the underlying data ret(id) = accum.localValue
// is valid.
ret(id) = accum.get match {
case Some(values) => values.localValue
case None => None
}
} }
return ret return ret
} }
...@@ -341,6 +345,8 @@ private[spark] object Accumulators { ...@@ -341,6 +345,8 @@ private[spark] object Accumulators {
case None => case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
} }
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment