diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index febcf9c6ee7c1b6d84553eca1e265e782e736be7..d9c62648550da47b89aa5554bdb1a3c84f45d203 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -330,7 +330,7 @@ class SparkContext( } /** - * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its InputFormat and any + * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, * etc). */ @@ -346,34 +346,49 @@ class SparkContext( new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } - /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ - def hadoopFile[K, V]( + /** + * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration + * that has already been broadcast and use it to construct JobConfs local to each process. These + * JobConfs will be initialized using an optional, user-specified closure. + */ + def hadoopRDD[K, V]( path: String, + confBroadcast: Broadcast[SerializableWritable[Configuration]], + initLocalJobConfOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int = defaultMinSplits - ): RDD[(K, V)] = { - // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. - val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) - hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) + minSplits: Int + ): RDD[(K, V)] = { + new HadoopRDD( + this, + confBroadcast, + initLocalJobConfOpt, + inputFormatClass, + keyClass, + valueClass, + minSplits) } - /** - * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration - * that has already been broadcast, assuming that it's safe to use it to construct a - * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued). - */ + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ def hadoopFile[K, V]( path: String, - confBroadcast: Broadcast[SerializableWritable[Configuration]], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], - minSplits: Int + minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { - new HadoopFileRDD( - this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits) + // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. + val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) + val setInputPathsFunc = Some((jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)) + new HadoopRDD( + this, + confBroadcast, + setInputPathsFunc, + inputFormatClass, + keyClass, + valueClass, + minSplits) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index d3b3fffd409d17dfda16aca529596b064b9b1058..4ecdd65e9b8516d1684a58f77dcd244ccf85ec2d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -33,41 +33,6 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.util.NextIterator import org.apache.hadoop.conf.{Configuration, Configurable} -/** - * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file - * system, or S3). - * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same - * across multiple reads; the 'path' is the only variable that is different across new JobConfs - * created from the Configuration. - */ -class HadoopFileRDD[K, V]( - sc: SparkContext, - path: String, - broadcastedConf: Broadcast[SerializableWritable[Configuration]], - inputFormatClass: Class[_ <: InputFormat[K, V]], - keyClass: Class[K], - valueClass: Class[V], - minSplits: Int) - extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) { - - override def getJobConf(): JobConf = { - if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) { - // getJobConf() has been called previously, so there is already a local cache of the JobConf - // needed by this RDD. - return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf] - } else { - // Create a new JobConf, set the input file/directory paths to read from, and cache the - // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through - // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple - // getJobConf() calls for this RDD in the local process. - // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - val newJobConf = new JobConf(broadcastedConf.value.value) - FileInputFormat.setInputPaths(newJobConf, path) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - return newJobConf - } - } -} /** * A Spark split class that wraps around a Hadoop InputSplit. @@ -83,11 +48,24 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp } /** - * A base class that provides core functionality for reading data partitions stored in Hadoop. + * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, + * sources in HBase, or S3). + * + * @param sc The SparkContext to associate the RDD with. + * @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed + * variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job. + * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. + * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobCOnf that HadoopRDD + * creates. + * @param inputFormatClass Storage format of the data to be read. + * @param keyClass Class of the key associated with the inputFormatClass. + * @param valueClass Class of the value associated with the inputFormatClass. + * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate. */ class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableWritable[Configuration]], + initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @@ -105,6 +83,7 @@ class HadoopRDD[K, V]( sc, sc.broadcast(new SerializableWritable(conf)) .asInstanceOf[Broadcast[SerializableWritable[Configuration]]], + None /* initLocalJobConfFuncOpt */, inputFormatClass, keyClass, valueClass, @@ -130,6 +109,7 @@ class HadoopRDD[K, V]( // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. val newJobConf = new JobConf(broadcastedConf.value.value) + initLocalJobConfFuncOpt.map(f => f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) return newJobConf }