Skip to content
Snippets Groups Projects
Commit 8867cd0b authored by Aaron Davidson's avatar Aaron Davidson Committed by Patrick Wendell
Browse files

SPARK-1097: Do not introduce deadlock while fixing concurrency bug

We recently added this lock on 'conf' in order to prevent concurrent creation. However, it turns out that this can introduce a deadlock because Hadoop also synchronizes on the Configuration objects when creating new Configurations (and they do so via a static REGISTRY which contains all created Configurations).

This fix forces all Spark initialization of Configuration objects to occur serially by using a static lock that we control, and thus also prevents introducing the deadlock.

Author: Aaron Davidson <aaron@databricks.com>

Closes #1409 from aarondav/1054 and squashes the following commits:

7d1b769 [Aaron Davidson] SPARK-1097: Do not introduce deadlock while fixing concurrency bug
parent 7c8d1232
No related branches found
No related tags found
No related merge requests found
...@@ -140,8 +140,8 @@ class HadoopRDD[K, V]( ...@@ -140,8 +140,8 @@ class HadoopRDD[K, V](
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) // Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456).
conf.synchronized { HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
val newJobConf = new JobConf(conf) val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf)) initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
...@@ -246,6 +246,9 @@ class HadoopRDD[K, V]( ...@@ -246,6 +246,9 @@ class HadoopRDD[K, V](
} }
private[spark] object HadoopRDD { private[spark] object HadoopRDD {
/** Constructing Configuration objects is not threadsafe, use this lock to serialize. */
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
/** /**
* The three methods below are helpers for accessing the local map, a property of the SparkEnv of * The three methods below are helpers for accessing the local map, a property of the SparkEnv of
* the local process. * the local process.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment