Skip to content
Snippets Groups Projects
Commit ac310098 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

More docs in RDD class

parent bd688940
No related branches found
No related tags found
No related merge requests found
...@@ -164,6 +164,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial ...@@ -164,6 +164,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
// Transformations (return a new RDD) // Transformations (return a new RDD)
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
/** /**
...@@ -184,6 +187,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial ...@@ -184,6 +187,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def distinct(numSplits: Int = splits.size): RDD[T] = def distinct(numSplits: Int = splits.size): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1) map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1)
/**
* Return a sampled subset of this RDD.
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, fraction, seed) new SampledRDD(this, withReplacement, fraction, seed)
...@@ -232,27 +238,53 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial ...@@ -232,27 +238,53 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
*/ */
def ++(other: RDD[T]): RDD[T] = this.union(other) def ++(other: RDD[T]): RDD[T] = this.union(other)
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = new GlommedRDD(this) def glom(): RDD[Array[T]] = new GlommedRDD(this)
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = { def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
val cleanF = sc.clean(f) val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(numSplits) this.map(t => (cleanF(t), t)).groupByKey(numSplits)
} }
/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism) def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String): RDD[String] = new PipedRDD(this, command) def pipe(command: String): RDD[String] = new PipedRDD(this, command)
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command) def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command)
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: Seq[String], env: Map[String, String]): RDD[String] = def pipe(command: Seq[String], env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env) new PipedRDD(this, command, env)
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] = def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] =
new MapPartitionsRDD(this, sc.clean(f)) new MapPartitionsRDD(this, sc.clean(f))
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] = def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] =
new MapPartitionsWithSplitRDD(this, sc.clean(f)) new MapPartitionsWithSplitRDD(this, sc.clean(f))
...@@ -273,9 +305,15 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial ...@@ -273,9 +305,15 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*) Array.concat(results: _*)
} }
/**
* Return an array that contains all of the elements in this RDD.
*/
def toArray(): Array[T] = collect() def toArray(): Array[T] = collect()
/**
* Reduces the elements of this RDD using the specified associative binary operator.
*/
def reduce(f: (T, T) => T): T = { def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f) val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => { val reducePartition: Iterator[T] => Option[T] = iter => {
...@@ -432,11 +470,17 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial ...@@ -432,11 +470,17 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
case _ => throw new UnsupportedOperationException("empty collection") case _ => throw new UnsupportedOperationException("empty collection")
} }
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) { def saveAsTextFile(path: String) {
this.map(x => (NullWritable.get(), new Text(x.toString))) this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
} }
/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String) { def saveAsObjectFile(path: String) {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment