Skip to content
Snippets Groups Projects
Commit 99796904 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #52 from harveyfeng/hadoop-closure

Add an optional closure parameter to HadoopRDD instantiation to use when creating local JobConfs.

Having HadoopRDD accept this optional closure eliminates the need for the HadoopFileRDD added earlier. It makes the HadoopRDD more general, in that the caller can specify any JobConf initialization flow.
parents dca80094 6c32aab8
No related branches found
No related tags found
No related merge requests found
......@@ -335,7 +335,7 @@ class SparkContext(
}
/**
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its InputFormat and any
* Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
*/
......@@ -361,24 +361,15 @@ class SparkContext(
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
/**
* Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
* that has already been broadcast, assuming that it's safe to use it to construct a
* HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
*/
def hadoopFile[K, V](
path: String,
confBroadcast: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int
): RDD[(K, V)] = {
new HadoopFileRDD(
this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minSplits)
}
/**
......
......@@ -33,41 +33,6 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
/**
* An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
* system, or S3).
* This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
* across multiple reads; the 'path' is the only variable that is different across new JobConfs
* created from the Configuration.
*/
class HadoopFileRDD[K, V](
sc: SparkContext,
path: String,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {
override def getJobConf(): JobConf = {
if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a new JobConf, set the input file/directory paths to read from, and cache the
// JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
// HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
// getJobConf() calls for this RDD in the local process.
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
FileInputFormat.setInputPaths(newJobConf, path)
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}
}
}
/**
* A Spark split class that wraps around a Hadoop InputSplit.
......@@ -83,11 +48,24 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}
/**
* A base class that provides core functionality for reading data partitions stored in Hadoop.
* An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
* sources in HBase, or S3).
*
* @param sc The SparkContext to associate the RDD with.
* @param broadCastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed
* variabe references an instance of JobConf, then that JobConf will be used for the Hadoop job.
* Otherwise, a new JobConf will be created on each slave using the enclosed Configuration.
* @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD
* creates.
* @param inputFormatClass Storage format of the data to be read.
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
*/
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
......@@ -105,6 +83,7 @@ class HadoopRDD[K, V](
sc,
sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
None /* initLocalJobConfFuncOpt */,
inputFormatClass,
keyClass,
valueClass,
......@@ -130,6 +109,7 @@ class HadoopRDD[K, V](
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment