From e8ae77df2458a80e549c4e227b1bc509dc4ea2ce Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Tue, 10 Jul 2012 11:11:35 -0700
Subject: [PATCH] Added more methods for loading/saving with new Hadoop API

---
 .../main/scala/spark/PairRDDFunctions.scala   | 14 +++++++++++--
 core/src/main/scala/spark/SparkContext.scala  | 20 +++++++++++++++++--
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index ea24c7897d..843badd9d1 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -13,6 +13,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.HashMap
 import scala.collection.JavaConversions._
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.BytesWritable
 import org.apache.hadoop.io.NullWritable
@@ -297,8 +298,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
       path: String,
       keyClass: Class[_],
       valueClass: Class[_],
-      outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
-    val job = new NewAPIHadoopJob
+      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+      conf: Configuration) {
+    val job = new NewAPIHadoopJob(conf)
     job.setOutputKeyClass(keyClass)
     job.setOutputValueClass(valueClass)
     val wrappedConf = new SerializableWritable(job.getConfiguration)
@@ -339,6 +341,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
     jobCommitter.cleanupJob(jobTaskContext)
   }
 
+  def saveAsNewAPIHadoopFile(
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
+    saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, new Configuration)
+  }
+
   def saveAsHadoopFile(
       path: String,
       keyClass: Class[_],
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 4a6abf20b0..3d3fda1e47 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -202,8 +202,24 @@ class SparkContext(
       fClass: Class[F],
       kClass: Class[K],
       vClass: Class[V],
-      conf: Configuration
-      ): RDD[(K, V)] = new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+      conf: Configuration): RDD[(K, V)] = {
+    val job = new NewHadoopJob(conf)
+    NewFileInputFormat.addInputPath(job, new Path(path))
+    val updatedConf = job.getConfiguration
+    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
+  }
+
+  /** 
+   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
+   * and extra configuration options to pass to the input format.
+   */
+  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
+      conf: Configuration,
+      fClass: Class[F],
+      kClass: Class[K],
+      vClass: Class[V]): RDD[(K, V)] = {
+    new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+  }
 
   /** Get an RDD for a Hadoop SequenceFile with given key and value types */
   def sequenceFile[K, V](path: String,
-- 
GitLab