Skip to content
Snippets Groups Projects
Commit 96f0be93 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Added groupBy function in RDD

parent 72ec298c
No related branches found
No related tags found
No related merge requests found
......@@ -8,6 +8,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
import scala.collection.mutable.HashMap
import SparkContext._
import mesos._
......@@ -90,6 +92,12 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
def cartesian[U: ClassManifest](other: RDD[U]) =
new CartesianRDD(sc, this, other)
def groupBy[K](func: T => K, numSplits: Int): RDD[(K, Seq[T])] =
this.map(t => (func(t), t)).groupByKey(numSplits)
def groupBy[K](func: T => K): RDD[(K, Seq[T])] =
groupBy[K](func, sc.numCores)
}
@serializable
......@@ -368,7 +376,7 @@ extends RDD[Pair[T, U]](sc) {
def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
val vs: RDD[(K, Either[V, W])] = self.map { case (k, v) => (k, Left(v)) }
val ws: RDD[(K, Either[V, W])] = other.map { case (k, w) => (k, Right(w)) }
new PairRDDExtras(vs ++ ws).groupByKey(numSplits).flatMap {
(vs ++ ws).groupByKey(numSplits).flatMap {
case (k, seq) => {
val vbuf = new ArrayBuffer[V]
val wbuf = new ArrayBuffer[W]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment