From ac310098ef6a195981080a0ae840533141780943 Mon Sep 17 00:00:00 2001 From: Patrick Wendell <pwendell@gmail.com> Date: Mon, 8 Oct 2012 22:25:03 -0700 Subject: [PATCH] More docs in RDD class --- core/src/main/scala/spark/RDD.scala | 46 ++++++++++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 42d5b821f8..4d984591bd 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -164,6 +164,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial // Transformations (return a new RDD) + /** + * Return a new RDD by applying a function to all elements of this RDD. + */ def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) /** @@ -184,6 +187,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def distinct(numSplits: Int = splits.size): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1) + /** + * Return a sampled subset of this RDD. + */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = new SampledRDD(this, withReplacement, fraction, seed) @@ -232,27 +238,53 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial */ def ++(other: RDD[T]): RDD[T] = this.union(other) + /** + * Return an RDD created by coalescing all elements within each partition into an array. + */ def glom(): RDD[Array[T]] = new GlommedRDD(this) def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) + /** + * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements + * mapping to that key. + */ def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(numSplits) } + /** + * Return an RDD of grouped items. + */ def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism) + /** + * Return an RDD created by piping elements to a forked external process. + */ def pipe(command: String): RDD[String] = new PipedRDD(this, command) + /** + * Return an RDD created by piping elements to a forked external process. + */ def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command) + /** + * Return an RDD created by piping elements to a forked external process. + */ def pipe(command: Seq[String], env: Map[String, String]): RDD[String] = new PipedRDD(this, command, env) + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] = new MapPartitionsRDD(this, sc.clean(f)) + /** + * Return a new RDD by applying a function to each partition of this RDD, while tracking the index + * of the original partition. + */ def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] = new MapPartitionsWithSplitRDD(this, sc.clean(f)) @@ -273,9 +305,15 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } - + + /** + * Return an array that contains all of the elements in this RDD. + */ def toArray(): Array[T] = collect() + /** + * Reduces the elements of this RDD using the specified associative binary operator. + */ def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { @@ -432,11 +470,17 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial case _ => throw new UnsupportedOperationException("empty collection") } + /** + * Save this RDD as a text file, using string representations of elements. + */ def saveAsTextFile(path: String) { this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } + /** + * Save this RDD as a SequenceFile of serialized objects. + */ def saveAsObjectFile(path: String) { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) -- GitLab