Skip to content
Snippets Groups Projects
Commit 2cd40db2 authored by Josh Rosen's avatar Josh Rosen Committed by Josh Rosen
Browse files

[SPARK-2546] Clone JobConf for each task (branch-1.0 / 1.1 backport)

This patch attempts to fix SPARK-2546 in `branch-1.0` and `branch-1.1`.  The underlying problem is that thread-safety issues in Hadoop Configuration objects may cause Spark tasks to get stuck in infinite loops.  The approach taken here is to clone a new copy of the JobConf for each task rather than sharing a single copy between tasks.  Note that there are still Configuration thread-safety issues that may affect the driver, but these seem much less likely to occur in practice and will be more complex to fix (see discussion on the SPARK-2546 ticket).

This cloning is guarded by a new configuration option (`spark.hadoop.cloneConf`) and is disabled by default in order to avoid unexpected performance regressions for workloads that are unaffected by the Configuration thread-safety issues.

Author: Josh Rosen <joshrosen@apache.org>

Closes #2684 from JoshRosen/jobconf-fix-backport and squashes the following commits:

f14f259 [Josh Rosen] Add configuration option to control cloning of Hadoop JobConf.
b562451 [Josh Rosen] Remove unused jobConfCacheKey field.
dd25697 [Josh Rosen] [SPARK-2546] [1.0 / 1.1 backport] Clone JobConf for each task.
parent 327404d8
No related branches found
No related tags found
No related merge requests found
...@@ -129,27 +129,47 @@ class HadoopRDD[K, V]( ...@@ -129,27 +129,47 @@ class HadoopRDD[K, V](
// used to build JobTracker ID // used to build JobTracker ID
private val createTime = new Date() private val createTime = new Date()
private val shouldCloneJobConf = sc.conf.get("spark.hadoop.cloneConf", "false").toBoolean
// Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
protected def getJobConf(): JobConf = { protected def getJobConf(): JobConf = {
val conf: Configuration = broadcastedConf.value.value val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) { if (shouldCloneJobConf) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. // Hadoop Configuration objects are not thread-safe, which may lead to various problems if
conf.asInstanceOf[JobConf] // one job modifies a configuration while another reads it (SPARK-2546). This problem occurs
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { // somewhat rarely because most jobs treat the configuration as though it's immutable. One
// getJobConf() has been called previously, so there is already a local cache of the JobConf // solution, implemented here, is to clone the Configuration object. Unfortunately, this
// needed by this RDD. // clone can be very expensive. To avoid unexpected performance regressions for workloads and
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] // Hadoop versions that do not suffer from these thread-safety issues, this cloning is
} else { // disabled by default.
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// Synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456).
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized { HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Cloning Hadoop Configuration")
val newJobConf = new JobConf(conf) val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf)) if (!conf.isInstanceOf[JobConf]) {
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) initLocalJobConfFuncOpt.map(f => f(newJobConf))
}
newJobConf newJobConf
} }
} else {
if (conf.isInstanceOf[JobConf]) {
logDebug("Re-using user-broadcasted JobConf")
conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
logDebug("Re-using cached JobConf")
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Creating new JobConf and caching it for later re-use")
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
}
}
} }
} }
...@@ -257,7 +277,10 @@ class HadoopRDD[K, V]( ...@@ -257,7 +277,10 @@ class HadoopRDD[K, V](
} }
private[spark] object HadoopRDD { private[spark] object HadoopRDD {
/** Constructing Configuration objects is not threadsafe, use this lock to serialize. */ /**
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
* Therefore, we synchronize on this lock before calling new JobConf() or new Configuration().
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object() val CONFIGURATION_INSTANTIATION_LOCK = new Object()
/** /**
......
...@@ -582,6 +582,15 @@ Apart from these, the following properties are also available, and may be useful ...@@ -582,6 +582,15 @@ Apart from these, the following properties are also available, and may be useful
output directories. We recommend that users do not disable this except if trying to achieve compatibility with output directories. We recommend that users do not disable this except if trying to achieve compatibility with
previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td> previous versions of Spark. Simply use Hadoop's FileSystem API to delete output directories by hand.</td>
</tr> </tr>
<tr>
<td><code>spark.hadoop.cloneConf</code></td>
<td>false</td>
<td>If set to true, clones a new Hadoop <code>Configuration</code> object for each task. This
option should be enabled to work around <code>Configuration</code> thread-safety issues (see
<a href="https://issues.apache.org/jira/browse/SPARK-2546">SPARK-2546</a> for more details).
This is disabled by default in order to avoid unexpected performance regressions for jobs that
are not affected by these issues.</td>
</tr>
<tr> <tr>
<td><code>spark.executor.heartbeatInterval</code></td> <td><code>spark.executor.heartbeatInterval</code></td>
<td>10000</td> <td>10000</td>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment