Skip to content
Snippets Groups Projects
Commit e8ae77df authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Added more methods for loading/saving with new Hadoop API

parent 0a472840
No related branches found
No related tags found
No related merge requests found
......@@ -13,6 +13,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
......@@ -297,8 +298,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
val job = new NewAPIHadoopJob
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration) {
val job = new NewAPIHadoopJob(conf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
val wrappedConf = new SerializableWritable(job.getConfiguration)
......@@ -339,6 +341,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
jobCommitter.cleanupJob(jobTaskContext)
}
def saveAsNewAPIHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, new Configuration)
}
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
......
......@@ -202,8 +202,24 @@ class SparkContext(
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration
): RDD[(K, V)] = new NewHadoopRDD(this, fClass, kClass, vClass, conf)
conf: Configuration): RDD[(K, V)] = {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = {
new NewHadoopRDD(this, fClass, kClass, vClass, conf)
}
/** Get an RDD for a Hadoop SequenceFile with given key and value types */
def sequenceFile[K, V](path: String,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment