diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 47bdb09986f67618008c1a602e74b1761be76c26..12e2f4f9028de89a5e64cb4c7e6b08eeb1b4bb3a 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -251,12 +251,16 @@ extends RDD[Array[T]](prev.context) {
 
   def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
   
-  def mapValues[U](f: V => U): RDD[(K, U)] =
-  {
+  def mapValues[U](f: V => U): RDD[(K, U)] = {
     val cleanF = self.context.clean(f)
     new MappedValuesRDD(self, cleanF)
   }
   
+  def flatMapValues[U](f: V => Traversable[U]): RDD[(K, U)] = {
+    val cleanF = self.context.clean(f)
+    new FlatMappedValuesRDD(self, cleanF)
+  }
+  
   def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
     val part = self.partitioner match {
       case Some(p) => p
@@ -291,6 +295,21 @@ extends RDD[(K, U)](prev.context) {
   override def splits = prev.splits
   override def preferredLocations(split: Split) = prev.preferredLocations(split)
   override val dependencies = List(new OneToOneDependency(prev))
-  override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))}
   override val partitioner = prev.partitioner
-}
\ No newline at end of file
+  override def compute(split: Split) =
+    prev.iterator(split).map{case (k, v) => (k, f(v))}
+}
+
+class FlatMappedValuesRDD[K, V, U](
+  prev: RDD[(K, V)], f: V => Traversable[U])
+extends RDD[(K, U)](prev.context) {
+  override def splits = prev.splits
+  override def preferredLocations(split: Split) = prev.preferredLocations(split)
+  override val dependencies = List(new OneToOneDependency(prev))
+  override val partitioner = prev.partitioner
+  override def compute(split: Split) = {
+    prev.iterator(split).toStream.flatMap { 
+      case (k, v) => f(v).map(x => (k, x))
+    }.iterator
+  }
+}