Skip to content
Snippets Groups Projects
Commit 8074208f authored by Mingyu Kim's avatar Mingyu Kim Committed by Josh Rosen
Browse files

[SPARK-10611] Clone Configuration for each task for NewHadoopRDD

This patch attempts to fix the Hadoop Configuration thread safety issue for NewHadoopRDD in the same way SPARK-2546 fixed the issue for HadoopRDD.

Author: Mingyu Kim <mkim@palantir.com>

Closes #8763 from mingyukim/mkim/SPARK-10611.
parent 348d7c9a
No related branches found
No related tags found
No related merge requests found
...@@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T]( ...@@ -34,12 +34,13 @@ private[spark] class BinaryFileRDD[T](
override def getPartitions: Array[Partition] = { override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance val inputFormat = inputFormatClass.newInstance
val conf = getConf
inputFormat match { inputFormat match {
case configurable: Configurable => case configurable: Configurable =>
configurable.setConf(getConf) configurable.setConf(conf)
case _ => case _ =>
} }
val jobContext = newJobContext(getConf, jobId) val jobContext = newJobContext(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions) inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size) val result = new Array[Partition](rawSplits.size)
......
...@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition( ...@@ -44,7 +44,6 @@ private[spark] class NewHadoopPartition(
extends Partition { extends Partition {
val serializableHadoopSplit = new SerializableWritable(rawSplit) val serializableHadoopSplit = new SerializableWritable(rawSplit)
override def hashCode(): Int = 41 * (41 + rddId) + index override def hashCode(): Int = 41 * (41 + rddId) + index
} }
...@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V]( ...@@ -84,6 +83,27 @@ class NewHadoopRDD[K, V](
@transient protected val jobId = new JobID(jobTrackerId, id) @transient protected val jobId = new JobID(jobTrackerId, id)
private val shouldCloneJobConf = sparkContext.conf.getBoolean("spark.hadoop.cloneConf", false)
def getConf: Configuration = {
val conf: Configuration = confBroadcast.value.value
if (shouldCloneJobConf) {
// Hadoop Configuration objects are not thread-safe, which may lead to various problems if
// one job modifies a configuration while another reads it (SPARK-2546, SPARK-10611). This
// problem occurs somewhat rarely because most jobs treat the configuration as though it's
// immutable. One solution, implemented here, is to clone the Configuration object.
// Unfortunately, this clone can be very expensive. To avoid unexpected performance
// regressions for workloads and Hadoop versions that do not suffer from these thread-safety
// issues, this cloning is disabled by default.
NewHadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Cloning Hadoop Configuration")
new Configuration(conf)
}
} else {
conf
}
}
override def getPartitions: Array[Partition] = { override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance val inputFormat = inputFormatClass.newInstance
inputFormat match { inputFormat match {
...@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V]( ...@@ -104,7 +124,7 @@ class NewHadoopRDD[K, V](
val iter = new Iterator[(K, V)] { val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition] val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit) logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value val conf = getConf
val inputMetrics = context.taskMetrics val inputMetrics = context.taskMetrics
.getInputMetricsForReadMethod(DataReadMethod.Hadoop) .getInputMetricsForReadMethod(DataReadMethod.Hadoop)
...@@ -230,11 +250,15 @@ class NewHadoopRDD[K, V]( ...@@ -230,11 +250,15 @@ class NewHadoopRDD[K, V](
super.persist(storageLevel) super.persist(storageLevel)
} }
def getConf: Configuration = confBroadcast.value.value
} }
private[spark] object NewHadoopRDD { private[spark] object NewHadoopRDD {
/**
* Configuration's constructor is not threadsafe (see SPARK-1097 and HADOOP-10456).
* Therefore, we synchronize on this lock before calling new Configuration().
*/
val CONFIGURATION_INSTANTIATION_LOCK = new Object()
/** /**
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
* the given function rather than the index of the partition. * the given function rather than the index of the partition.
...@@ -268,12 +292,13 @@ private[spark] class WholeTextFileRDD( ...@@ -268,12 +292,13 @@ private[spark] class WholeTextFileRDD(
override def getPartitions: Array[Partition] = { override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance val inputFormat = inputFormatClass.newInstance
val conf = getConf
inputFormat match { inputFormat match {
case configurable: Configurable => case configurable: Configurable =>
configurable.setConf(getConf) configurable.setConf(conf)
case _ => case _ =>
} }
val jobContext = newJobContext(getConf, jobId) val jobContext = newJobContext(conf, jobId)
inputFormat.setMinPartitions(jobContext, minPartitions) inputFormat.setMinPartitions(jobContext, minPartitions)
val rawSplits = inputFormat.getSplits(jobContext).toArray val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size) val result = new Array[Partition](rawSplits.size)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment