From 94ba95bcb2c9ad99a4eacdf503fa763a44e157ad Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Tue, 12 Apr 2011 19:51:58 -0700
Subject: [PATCH] Added flatMapValues

---
 core/src/main/scala/spark/RDD.scala | 27 +++++++++++++++++++++++----
 1 file changed, 23 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 47bdb09986..12e2f4f902 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -251,12 +251,16 @@ extends RDD[Array[T]](prev.context) {
 
   def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
   
-  def mapValues[U](f: V => U): RDD[(K, U)] =
-  {
+  def mapValues[U](f: V => U): RDD[(K, U)] = {
     val cleanF = self.context.clean(f)
     new MappedValuesRDD(self, cleanF)
   }
   
+  def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = {
+    val cleanF = self.context.clean(f)
+    new FlatMappedValuesRDD(self, cleanF)
+  }
+  
   def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
     val part = self.partitioner match {
       case Some(p) => p
@@ -291,6 +295,21 @@ extends RDD[(K, U)](prev.context) {
   override def splits = prev.splits
   override def preferredLocations(split: Split) = prev.preferredLocations(split)
   override val dependencies = List(new OneToOneDependency(prev))
-  override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))}
   override val partitioner = prev.partitioner
-}
\ No newline at end of file
+  override def compute(split: Split) =
+    prev.iterator(split).map{case (k, v) => (k, f(v))}
+}
+
+class FlatMappedValuesRDD[K, V, U](
+  prev: RDD[(K, V)], f: V => Traversable[U])
+extends RDD[(K, U)](prev.context) {
+  override def splits = prev.splits
+  override def preferredLocations(split: Split) = prev.preferredLocations(split)
+  override val dependencies = List(new OneToOneDependency(prev))
+  override val partitioner = prev.partitioner
+  override def compute(split: Split) = {
+    prev.iterator(split).toStream.flatMap { 
+      case (k, v) => f(v).map(x => (k, x))
+    }.iterator
+  }
+}
-- 
GitLab