diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index ea24c7897d6f7d48225cb7dc686b4694bf6e38bc..843badd9d1c9b0ae62cecb2dbb710910d2485242 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -13,6 +13,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.NullWritable @@ -297,8 +298,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( path: String, keyClass: Class[_], valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { - val job = new NewAPIHadoopJob + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration) { + val job = new NewAPIHadoopJob(conf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) val wrappedConf = new SerializableWritable(job.getConfiguration) @@ -339,6 +341,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( jobCommitter.cleanupJob(jobTaskContext) } + def saveAsNewAPIHadoopFile( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, new Configuration) + } + def saveAsHadoopFile( path: String, keyClass: Class[_], diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 4a6abf20b0a0b52faa22fbad52d9305ca88dd6da..3d3fda1e47967b7790bfc12f4accacbeead615e3 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -202,8 +202,24 @@ class SparkContext( fClass: Class[F], kClass: Class[K], vClass: Class[V], - conf: Configuration - ): RDD[(K, V)] = new NewHadoopRDD(this, fClass, kClass, vClass, conf) + conf: Configuration): RDD[(K, V)] = { + val job = new NewHadoopJob(conf) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updatedConf = job.getConfiguration + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) + } + + /** + * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat + * and extra configuration options to pass to the input format. + */ + def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( + conf: Configuration, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V]): RDD[(K, V)] = { + new NewHadoopRDD(this, fClass, kClass, vClass, conf) + } /** Get an RDD for a Hadoop SequenceFile with given key and value types */ def sequenceFile[K, V](path: String,