diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d7e681d921bd439dbc4a71ef5665fdc8a874b1a9..9a3d36b51e4dfa8f334b0cc64b359ae2c9e25d0a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -345,9 +345,20 @@ class SparkContext( } /** - * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and any - * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, - * etc). + * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other + * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable), + * using the older MapReduce API (`org.apache.hadoop.mapred`). + * + * @param conf JobConf for setting up the dataset + * @param inputFormatClass Class of the [[InputFormat]] + * @param keyClass Class of the keys + * @param valueClass Class of the values + * @param minSplits Minimum number of Hadoop Splits to generate. + * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader. + * Most RecordReader implementations reuse wrapper objects across multiple + * records, and can cause problems in RDD collect or aggregation operations. + * By default the records are cloned in Spark. However, application + * programmers can explicitly disable the cloning for better performance. */ def hadoopRDD[K: ClassTag, V: ClassTag]( conf: JobConf, @@ -355,11 +366,11 @@ class SparkContext( keyClass: Class[K], valueClass: Class[V], minSplits: Int = defaultMinSplits, - cloneKeyValues: Boolean = true + cloneRecords: Boolean = true ): RDD[(K, V)] = { // Add necessary security credentials to the JobConf before broadcasting it. SparkHadoopUtil.get.addCredentials(conf) - new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues) + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ @@ -369,7 +380,7 @@ class SparkContext( keyClass: Class[K], valueClass: Class[V], minSplits: Int = defaultMinSplits, - cloneKeyValues: Boolean = true + cloneRecords: Boolean = true ): RDD[(K, V)] = { // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) @@ -382,7 +393,7 @@ class SparkContext( keyClass, valueClass, minSplits, - cloneKeyValues) + cloneRecords) } /** @@ -394,14 +405,14 @@ class SparkContext( * }}} */ def hadoopFile[K, V, F <: InputFormat[K, V]] - (path: String, minSplits: Int, cloneKeyValues: Boolean = true) + (path: String, minSplits: Int, cloneRecords: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { hadoopFile(path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], vm.runtimeClass.asInstanceOf[Class[V]], minSplits, - cloneKeyValues = cloneKeyValues) + cloneRecords) } /** @@ -412,20 +423,20 @@ class SparkContext( * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) * }}} */ - def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean = true) + def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = - hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues) + hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords) /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] - (path: String, cloneKeyValues: Boolean = true) + (path: String, cloneRecords: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { newAPIHadoopFile( path, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], vm.runtimeClass.asInstanceOf[Class[V]], - cloneKeyValues = cloneKeyValues) + cloneRecords = cloneRecords) } /** @@ -438,11 +449,11 @@ class SparkContext( kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration, - cloneKeyValues: Boolean = true): RDD[(K, V)] = { + cloneRecords: Boolean = true): RDD[(K, V)] = { val job = new NewHadoopJob(conf) NewFileInputFormat.addInputPath(job, new Path(path)) val updatedConf = job.getConfiguration - new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues) + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords) } /** @@ -454,8 +465,8 @@ class SparkContext( fClass: Class[F], kClass: Class[K], vClass: Class[V], - cloneKeyValues: Boolean = true): RDD[(K, V)] = { - new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues) + cloneRecords: Boolean = true): RDD[(K, V)] = { + new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords) } /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ @@ -463,16 +474,16 @@ class SparkContext( keyClass: Class[K], valueClass: Class[V], minSplits: Int, - cloneKeyValues: Boolean = true + cloneRecords: Boolean = true ): RDD[(K, V)] = { val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] - hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues) + hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords) } /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V], - cloneKeyValues: Boolean = true): RDD[(K, V)] = - sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues) + cloneRecords: Boolean = true): RDD[(K, V)] = + sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords) /** * Version of sequenceFile() for types implicitly convertible to Writables through a @@ -490,17 +501,18 @@ class SparkContext( * for the appropriate type. In addition, we pass the converter a ClassTag of its type to * allow it to figure out the Writable class to use in the subclass case. */ - def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits, - cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], - kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) + def sequenceFile[K, V] + (path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true) + (implicit km: ClassTag[K], vm: ClassTag[V], + kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) : RDD[(K, V)] = { val kc = kcf() val vc = vcf() val format = classOf[SequenceFileInputFormat[Writable, Writable]] val writables = hadoopFile(path, format, kc.writableClass(km).asInstanceOf[Class[Writable]], - vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues) - writables.map{case (k,v) => (kc.convert(k), vc.convert(v))} + vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords) + writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 2da4611b9c0ff64689a958d4ede91c7681c61886..902083c24f088b04239b6bbd89087cf84c9ccf63 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -45,14 +45,14 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp val inputSplit = new SerializableWritable[InputSplit](s) - override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt + override def hashCode(): Int = 41 * (41 + rddId) + idx override val index: Int = idx } /** * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, - * sources in HBase, or S3). + * sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`). * * @param sc The SparkContext to associate the RDD with. * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed @@ -64,6 +64,11 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate. + * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader. + * Most RecordReader implementations reuse wrapper objects across multiple + * records, and can cause problems in RDD collect or aggregation operations. + * By default the records are cloned in Spark. However, application + * programmers can explicitly disable the cloning for better performance. */ class HadoopRDD[K: ClassTag, V: ClassTag]( sc: SparkContext, @@ -73,7 +78,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( keyClass: Class[K], valueClass: Class[V], minSplits: Int, - cloneKeyValues: Boolean) + cloneRecords: Boolean) extends RDD[(K, V)](sc, Nil) with Logging { def this( @@ -83,7 +88,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( keyClass: Class[K], valueClass: Class[V], minSplits: Int, - cloneKeyValues: Boolean) = { + cloneRecords: Boolean) = { this( sc, sc.broadcast(new SerializableWritable(conf)) @@ -93,7 +98,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( keyClass, valueClass, minSplits, - cloneKeyValues) + cloneRecords) } protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) @@ -165,9 +170,9 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback{ () => closeIfNeeded() } val key: K = reader.createKey() - val keyCloneFunc = cloneWritables[K](getConf) + val keyCloneFunc = cloneWritables[K](jobConf) val value: V = reader.createValue() - val valueCloneFunc = cloneWritables[V](getConf) + val valueCloneFunc = cloneWritables[V](jobConf) override def getNext() = { try { finished = !reader.next(key, value) @@ -175,9 +180,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( case eof: EOFException => finished = true } - if (cloneKeyValues) { - (keyCloneFunc(key.asInstanceOf[Writable]), - valueCloneFunc(value.asInstanceOf[Writable])) + if (cloneRecords) { + (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable])) } else { (key, value) } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index a34786495bd999641044541f6daf40cfdf12e004..992bd4aa0ad5dbc17283048a1f0613a3ea3144b6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -36,16 +36,31 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS val serializableHadoopSplit = new SerializableWritable(rawSplit) - override def hashCode(): Int = (41 * (41 + rddId) + index) + override def hashCode(): Int = 41 * (41 + rddId) + index } +/** + * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, + * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`). + * + * @param sc The SparkContext to associate the RDD with. + * @param inputFormatClass Storage format of the data to be read. + * @param keyClass Class of the key associated with the inputFormatClass. + * @param valueClass Class of the value associated with the inputFormatClass. + * @param conf The Hadoop configuration. + * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader. + * Most RecordReader implementations reuse wrapper objects across multiple + * records, and can cause problems in RDD collect or aggregation operations. + * By default the records are cloned in Spark. However, application + * programmers can explicitly disable the cloning for better performance. + */ class NewHadoopRDD[K: ClassTag, V: ClassTag]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @transient conf: Configuration, - cloneKeyValues: Boolean) + cloneRecords: Boolean) extends RDD[(K, V)](sc, Nil) with SparkHadoopMapReduceUtil with Logging { @@ -112,9 +127,8 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag]( havePair = false val key = reader.getCurrentKey val value = reader.getCurrentValue - if (cloneKeyValues) { - (keyCloneFunc(key.asInstanceOf[Writable]), - valueCloneFunc(value.asInstanceOf[Writable])) + if (cloneRecords) { + (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable])) } else { (key, value) }