diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 70c7474a936dc62456cb0b8c159ed373925b4f80..70a99b33d753c20575e6470918f035135bb3df8e 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -220,20 +220,23 @@ object Bagel extends Logging {
    */
   private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
     sc: SparkContext,
-    grouped: RDD[(K, (Seq[C], Seq[V]))],
+    grouped: RDD[(K, (Iterable[C], Iterable[V]))],
     compute: (V, Option[C]) => (V, Array[M]),
     storageLevel: StorageLevel
   ): (RDD[(K, (V, Array[M]))], Int, Int) = {
     var numMsgs = sc.accumulator(0)
     var numActiveVerts = sc.accumulator(0)
-    val processed = grouped.flatMapValues {
-      case (_, vs) if vs.size == 0 => None
-      case (c, vs) =>
+    val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
+      .flatMapValues {
+      case (_, vs) if !vs.hasNext => None
+      case (c, vs) => {
         val (newVert, newMsgs) =
-          compute(vs(0), c match {
-            case Seq(comb) => Some(comb)
-            case Seq() => None
-          })
+          compute(vs.next,
+            c.hasNext match {
+              case true => Some(c.next)
+              case false => None
+            }
+          )
 
         numMsgs += newMsgs.size
         if (newVert.active) {
@@ -241,6 +244,7 @@ object Bagel extends Logging {
         }
 
         Some((newVert, newMsgs))
+      }
     }.persist(storageLevel)
 
     // Force evaluation of processed RDD for accurate performance measurements
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 9596dbaf754886df2260e042214a0331035b1039..e6c5d859176787c667cbd1e9e2c1405e1fe966a3 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.api.java
 
 import java.util.{Comparator, List => JList}
+import java.lang.{Iterable => JIterable}
 
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
@@ -250,14 +251,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Group the values for each key in the RDD into a single sequence. Allows controlling the
    * partitioning of the resulting key-value pair RDD by passing a Partitioner.
    */
-  def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
+  def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
     fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
 
   /**
    * Group the values for each key in the RDD into a single sequence. Hash-partitions the
    * resulting RDD with into `numPartitions` partitions.
    */
-  def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
+  def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
     fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
 
   /**
@@ -367,7 +368,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Group the values for each key in the RDD into a single sequence. Hash-partitions the
    * resulting RDD with the existing partitioner/parallelism level.
    */
-  def groupByKey(): JavaPairRDD[K, JList[V]] =
+  def groupByKey(): JavaPairRDD[K, JIterable[V]] =
     fromRDD(groupByResultToJava(rdd.groupByKey()))
 
   /**
@@ -462,7 +463,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * list of values for that key in `this` as well as `other`.
    */
   def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
-  : JavaPairRDD[K, (JList[V], JList[W])] =
+  : JavaPairRDD[K, (JIterable[V], JIterable[W])] =
     fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
 
   /**
@@ -470,14 +471,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
   def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
-      partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+      partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
     fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
 
   /**
    * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
    * list of values for that key in `this` as well as `other`.
    */
-  def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
+  def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
     fromRDD(cogroupResultToJava(rdd.cogroup(other)))
 
   /**
@@ -485,7 +486,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
   def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
-  : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
     fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
 
   /**
@@ -493,7 +494,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * list of values for that key in `this` as well as `other`.
    */
   def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
-  : JavaPairRDD[K, (JList[V], JList[W])] =
+  : JavaPairRDD[K, (JIterable[V], JIterable[W])] =
     fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
 
   /**
@@ -501,16 +502,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
   def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
-  : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
     fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
 
   /** Alias for cogroup. */
-  def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
+  def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
     fromRDD(cogroupResultToJava(rdd.groupWith(other)))
 
   /** Alias for cogroup. */
   def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
-  : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+  : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
     fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
 
   /**
@@ -695,21 +696,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
 
 object JavaPairRDD {
   private[spark]
-  def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = {
-    rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList)
+  def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = {
+    rddToPairRDDFunctions(rdd).mapValues(asJavaIterable)
   }
 
   private[spark]
   def cogroupResultToJava[K: ClassTag, V, W](
-      rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = {
-    rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
+      rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = {
+    rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2)))
   }
 
   private[spark]
   def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
-      rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = {
+      rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
+      : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
     rddToPairRDDFunctions(rdd)
-      .mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3)))
+      .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
   }
 
   def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 6e8ec8e0c76294f2f94147f4798e5cf6883cb64e..ae577b500ccb48bf92757c8366e96099fe746906 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.api.java
 
-import java.util.{Comparator, Iterator => JIterator, List => JList}
+import java.util.{Comparator, List => JList, Iterator => JIterator}
 import java.lang.{Iterable => JIterable}
 
 import scala.collection.JavaConversions._
@@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
    * mapping to that key.
    */
-  def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
+  def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
     implicit val ctagK: ClassTag[K] = fakeClassTag
     implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
     JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
@@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
    * mapping to that key.
    */
-  def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
+  def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
     implicit val ctagK: ClassTag[K] = fakeClassTag
     implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
     JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 14386ff5b91272a85fd6afaf858cda6a67b6793e..a92a84b5342d1d35ce61b853fb2335b2dbac1b01 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * Group the values for each key in the RDD into a single sequence. Allows controlling the
    * partitioning of the resulting key-value pair RDD by passing a Partitioner.
    */
-  def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
     // groupByKey shouldn't use map side combine because map side combine does not
     // reduce the amount of data shuffled and requires all map side data be inserted
     // into a hash table, leading to more objects in the old gen.
@@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
     def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
     val bufs = combineByKey[ArrayBuffer[V]](
       createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
-    bufs.asInstanceOf[RDD[(K, Seq[V])]]
+    bufs.mapValues(_.toIterable)
   }
 
   /**
    * Group the values for each key in the RDD into a single sequence. Hash-partitions the
    * resulting RDD with into `numPartitions` partitions.
    */
-  def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
+  def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
     groupByKey(new HashPartitioner(numPartitions))
   }
 
@@ -298,7 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    */
   def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
     this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+      for (v <- vs; w <- ws) yield (v, w)
     }
   }
 
@@ -311,9 +311,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
     this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
       if (ws.isEmpty) {
-        vs.iterator.map(v => (v, None))
+        vs.map(v => (v, None))
       } else {
-        for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
+        for (v <- vs; w <- ws) yield (v, Some(w))
       }
     }
   }
@@ -328,9 +328,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       : RDD[(K, (Option[V], W))] = {
     this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
       if (vs.isEmpty) {
-        ws.iterator.map(w => (None, w))
+        ws.map(w => (None, w))
       } else {
-        for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
+        for (v <- vs; w <- ws) yield (Some(v), w)
       }
     }
   }
@@ -358,7 +358,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * Group the values for each key in the RDD into a single sequence. Hash-partitions the
    * resulting RDD with the existing partitioner/parallelism level.
    */
-  def groupByKey(): RDD[(K, Seq[V])] = {
+  def groupByKey(): RDD[(K, Iterable[V])] = {
     groupByKey(defaultPartitioner(self))
   }
 
@@ -453,7 +453,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
    * list of values for that key in `this` as well as `other`.
    */
-  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
+  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
+      : RDD[(K, (Iterable[V], Iterable[W]))]  = {
     if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
@@ -468,13 +469,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
     if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
     cg.mapValues { case Seq(vs, w1s, w2s) =>
-      (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+      (vs.asInstanceOf[Seq[V]],
+       w1s.asInstanceOf[Seq[W1]],
+       w2s.asInstanceOf[Seq[W2]])
     }
   }
 
@@ -482,7 +485,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
    * list of values for that key in `this` as well as `other`.
    */
-  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
     cogroup(other, defaultPartitioner(self, other))
   }
 
@@ -491,7 +494,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
     cogroup(other1, other2, defaultPartitioner(self, other1, other2))
   }
 
@@ -499,7 +502,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
    * list of values for that key in `this` as well as `other`.
    */
-  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
+  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
     cogroup(other, new HashPartitioner(numPartitions))
   }
 
@@ -508,18 +511,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    * tuple with the list of values for that key in `this`, `other1` and `other2`.
    */
   def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
     cogroup(other1, other2, new HashPartitioner(numPartitions))
   }
 
   /** Alias for cogroup. */
-  def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+  def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
     cogroup(other, defaultPartitioner(self, other))
   }
 
   /** Alias for cogroup. */
   def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+      : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
     cogroup(other1, other2, defaultPartitioner(self, other1, other2))
   }
 
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index bf3c57ad41eb24ceb4d2276ac6fde95b90b3d5f9..74fa2a4fcd4010a87cd5fd366be913f2325634e1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag](
   /**
    * Return an RDD of grouped items.
    */
-  def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] =
+  def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] =
     groupBy[K](f, defaultPartitioner(this))
 
   /**
    * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
    * mapping to that key.
    */
-  def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
+  def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] =
     groupBy(f, new HashPartitioner(numPartitions))
 
   /**
    * Return an RDD of grouped items.
    */
-  def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+  def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = {
     val cleanF = sc.clean(f)
     this.map(t => (cleanF(t), t)).groupByKey(p)
   }
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 762405be2a8f939b937e2b8097abf004435de1b2..ab2fdac5533498263643059bb4322a080fe704c3 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -18,10 +18,12 @@
 package org.apache.spark;
 
 import java.io.*;
+import java.lang.StringBuilder;
 import java.util.*;
 
 import scala.Tuple2;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.base.Optional;
 import com.google.common.base.Charsets;
@@ -197,7 +199,7 @@ public class JavaAPISuite implements Serializable {
       new Tuple2<String, String>("Oranges", "Citrus")
       ));
     Assert.assertEquals(2, categories.lookup("Oranges").size());
-    Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size());
+    Assert.assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0)));
   }
 
   @Test
@@ -209,15 +211,15 @@ public class JavaAPISuite implements Serializable {
         return x % 2 == 0;
       }
     };
-    JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+    JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
     Assert.assertEquals(2, oddsAndEvens.count());
-    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
-    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
+    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
 
     oddsAndEvens = rdd.groupBy(isOdd, 1);
     Assert.assertEquals(2, oddsAndEvens.count());
-    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
-    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
+    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
   }
 
   @SuppressWarnings("unchecked")
@@ -232,9 +234,9 @@ public class JavaAPISuite implements Serializable {
       new Tuple2<String, Integer>("Oranges", 2),
       new Tuple2<String, Integer>("Apples", 3)
     ));
-    JavaPairRDD<String, Tuple2<List<String>, List<Integer>>> cogrouped = categories.cogroup(prices);
-    Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString());
-    Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString());
+    JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = categories.cogroup(prices);
+    Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+    Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
 
     cogrouped.collect();
   }
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index f3fb64d87a2fd1df3b169d9c18505c6fd9fc59f0..12dbebcb286443fb18e9ad69f918d05f1691449e 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -72,7 +72,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
             throw new Exception("Intentional task failure")
           }
         }
-        (k, v(0) * v(0))
+        (k, v.head * v.head)
       }.collect()
     FailureSuiteState.synchronized {
       assert(FailureSuiteState.tasksRun === 4)
@@ -137,5 +137,3 @@ class FailureSuite extends FunSuite with LocalSparkContext {
 
   // TODO: Need to add tests with shuffle fetch failures.
 }
-
-
diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
index 627e9b5cd90606a417a009494d722a0e328ac1d0..867b28cc0d971d7afee2ecc7d1efc25ae5d75fe3 100644
--- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
@@ -85,7 +85,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
           (f: String => Unit) => {
             bl.value.map(f(_)); f("\u0001")
           },
-          (i: Tuple2[String, Seq[String]], f: String => Unit) => {
+          (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
             for (e <- i._2) {
               f(e + "_")
             }
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index f9e994b13dfbc2dc46e54e45c7a419a99c1241f2..8f3e6bd21b752bd2ef686a96e7af65118961dbe0 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -225,11 +225,12 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
     val joined = rdd1.groupWith(rdd2).collect()
     assert(joined.size === 4)
-    assert(joined.toSet === Set(
-      (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
-      (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
-      (3, (ArrayBuffer(1), ArrayBuffer())),
-      (4, (ArrayBuffer(), ArrayBuffer('w')))
+    val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet
+    assert(joinedSet === Set(
+      (1, (List(1, 2), List('x'))),
+      (2, (List(1), List('y', 'z'))),
+      (3, (List(1), List())),
+      (4, (List(), List('w')))
     ))
   }
 
@@ -447,4 +448,3 @@ class ConfigTestFormat() extends FakeFormat() with Configurable {
     super.getRecordWriter(p1)
   }
 }
-
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index fce1184d46364df589de437f78b838719b45d678..cdebefb67510c5d73f1741d1956f852f251fe866 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -174,9 +174,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
     assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5)))
 
     // groupByKey
-    val result2 = rdd.groupByKey().collect()
+    val result2 = rdd.groupByKey().collect().map(x => (x._1, x._2.toList)).toSet
     assert(result2.toSet == Set[(Int, Seq[Int])]
-      ((0, ArrayBuffer[Int](1, 1, 1, 1, 1)), (1, ArrayBuffer[Int](1, 1, 1, 1, 1))))
+      ((0, List[Int](1, 1, 1, 1, 1)), (1, List[Int](1, 1, 1, 1, 1))))
   }
 
   test("simple cogroup") {
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index eb70fb547564cc225aff8a7636854eadeb3c3f91..8513ba07e7705a91c88844f0c1ff94691b6ca28a 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -17,7 +17,10 @@
 
 package org.apache.spark.examples;
 
+
 import scala.Tuple2;
+
+import com.google.common.collect.Iterables;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -26,8 +29,9 @@ import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
 import org.apache.spark.api.java.function.PairFunction;
 
-import java.util.List;
 import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
 import java.util.regex.Pattern;
 
 /**
@@ -66,7 +70,7 @@ public final class JavaPageRank {
     JavaRDD<String> lines = ctx.textFile(args[1], 1);
 
     // Loads all URLs from input file and initialize their neighbors.
-    JavaPairRDD<String, List<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
+    JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
       @Override
       public Tuple2<String, String> call(String s) {
         String[] parts = SPACES.split(s);
@@ -75,9 +79,9 @@ public final class JavaPageRank {
     }).distinct().groupByKey().cache();
 
     // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
-    JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
+    JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() {
       @Override
-      public Double call(List<String> rs) {
+      public Double call(Iterable<String> rs) {
         return 1.0;
       }
     });
@@ -86,12 +90,13 @@ public final class JavaPageRank {
     for (int current = 0; current < Integer.parseInt(args[2]); current++) {
       // Calculates URL contributions to the rank of other URLs.
       JavaPairRDD<String, Double> contribs = links.join(ranks).values()
-        .flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
+        .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
           @Override
-          public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
+          public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
+	    int urlCount = Iterables.size(s._1);
             List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
-            for (String n : s._1()) {
-              results.add(new Tuple2<String, Double>(n, s._2() / s._1().size()));
+            for (String n : s._1) {
+              results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
             }
             return results;
           }
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index 27afa6b642758f97593bcbee44f6a3d8a2d8845f..7aac6a13597e659b4e2fc3f9a90ce781ffa12223 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -115,12 +115,16 @@ object WikipediaPageRankStandalone {
     var ranks = links.mapValues { edges => defaultRank }
     for (i <- 1 to numIterations) {
       val contribs = links.groupWith(ranks).flatMap {
-        case (id, (linksWrapper, rankWrapper)) =>
-          if (linksWrapper.length > 0) {
-            if (rankWrapper.length > 0) {
-              linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size))
+        case (id, (linksWrapperIterable, rankWrapperIterable)) =>
+          val linksWrapper = linksWrapperIterable.iterator
+          val rankWrapper = rankWrapperIterable.iterator
+          if (linksWrapper.hasNext) {
+            val linksWrapperHead = linksWrapper.next
+            if (rankWrapper.hasNext) {
+              val rankWrapperHead = rankWrapper.next
+              linksWrapperHead.map(dest => (dest, rankWrapperHead / linksWrapperHead.size))
             } else {
-              linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size))
+              linksWrapperHead.map(dest => (dest, defaultRank / linksWrapperHead.size))
             }
           } else {
             Array[(String, Double)]()
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index f67251217ed4a0c9b86b51bc293540c97c1547b3..7eb8b45fc3cf0e1a77864bac6dabef45cee0cc4f 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -23,6 +23,7 @@ import java.util.*;
 
 import scala.Tuple2;
 
+import com.google.common.collections.Iterables;
 import com.google.common.base.Optional;
 import com.google.common.io.Files;
 import org.apache.hadoop.io.IntWritable;
@@ -85,15 +86,15 @@ public class Java8APISuite implements Serializable {
   public void groupBy() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
     Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
-    JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+    JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
     Assert.assertEquals(2, oddsAndEvens.count());
-    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
-    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
+    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
 
     oddsAndEvens = rdd.groupBy(isOdd, 1);
     Assert.assertEquals(2, oddsAndEvens.count());
-    Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size());  // Evens
-    Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)));  // Evens
+    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
   }
 
   @Test
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
index 3e7cc648d1d37e7278feed52efc0d170a3bd83b0..0d97b7d92f1550010539af03fcade26ccbaa12d6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -69,11 +69,11 @@ class SVD {
 
   /**
    * Compute SVD using the current set parameters
-   * Returns (U, S, V)  such that A = USV^T 
+   * Returns (U, S, V)  such that A = USV^T
    * U is a row-by-row dense matrix
    * S is a simple double array of singular values
    * V is a 2d array matrix
-   * See [[denseSVD]] for more documentation 
+   * See [[denseSVD]] for more documentation
    */
   def compute(matrix: RDD[Array[Double]]):
   (RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
@@ -393,5 +393,3 @@ object SVD {
     System.exit(0)
   }
 }
-
-
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 0cc9f48769f83b54cd7a92fdea24333a21960641..3124fac326d221a1061a0b713ce18be5da9bb444 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -421,12 +421,12 @@ class ALS private (
    * Compute the new feature vectors for a block of the users matrix given the list of factors
    * it received from each product and its InLinkBlock.
    */
-  private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
+  private def updateBlock(messages: Iterable[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
       rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]])
     : Array[Array[Double]] =
   {
     // Sort the incoming block factor messages by block ID and make them an array
-    val blockFactors = messages.sortBy(_._1).map(_._2).toArray // Array[Array[Double]]
+    val blockFactors = messages.toSeq.sortBy(_._1).map(_._2).toArray // Array[Array[Double]]
     val numBlocks = blockFactors.length
     val numUsers = inLinkBlock.elementIds.length
 
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
index afe081295bfae2dc338485f00b3d837b70fd559e..87aac347579c7d9167c7c81a3a7e3f4afac5315d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
@@ -38,8 +38,10 @@ object LAUtils {
       case (i, cols) =>
         val rowArray = Array.ofDim[Double](n)
         var j = 0
-        while (j < cols.size) {
-          rowArray(cols(j)._1) = cols(j)._2
+        val colsItr = cols.iterator
+        while (colsItr.hasNext) {
+          val element = colsItr.next
+          rowArray(element._1) = element._2
           j += 1
         }
         MatrixRow(i, rowArray)
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index 5f4294fb1b7778090584041286822eabbb0e1efe..6f94d26ef86a92c7b91e6fbb5a32cc6e6621c180 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -31,11 +31,12 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
 
+from pyspark.resultiterable import ResultIterable
 
 def _do_python_join(rdd, other, numPartitions, dispatch):
     vs = rdd.map(lambda (k, v): (k, (1, v)))
     ws = other.map(lambda (k, v): (k, (2, v)))
-    return vs.union(ws).groupByKey(numPartitions).flatMapValues(dispatch)
+    return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__()))
 
 
 def python_join(rdd, other, numPartitions):
@@ -88,5 +89,5 @@ def python_cogroup(rdd, other, numPartitions):
                 vbuf.append(v)
             elif n == 2:
                 wbuf.append(v)
-        return (vbuf, wbuf)
+        return (ResultIterable(vbuf), ResultIterable(wbuf))
     return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index fb27863e07f553fe20f74741fdd0d571cce898ee..91fc7e637e2c6a92a64eff6acb2734f302e9b747 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -38,6 +38,7 @@ from pyspark.join import python_join, python_left_outer_join, \
 from pyspark.statcounter import StatCounter
 from pyspark.rddsampler import RDDSampler
 from pyspark.storagelevel import StorageLevel
+from pyspark.resultiterable import ResultIterable
 
 from py4j.java_collections import ListConverter, MapConverter
 
@@ -1118,7 +1119,7 @@ class RDD(object):
         Hash-partitions the resulting RDD with into numPartitions partitions.
 
         >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
-        >>> sorted(x.groupByKey().collect())
+        >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
         [('a', [1, 1]), ('b', [1])]
         """
 
@@ -1133,7 +1134,7 @@ class RDD(object):
             return a + b
 
         return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
-                numPartitions)
+                numPartitions).mapValues(lambda x: ResultIterable(x))
 
     # TODO: add tests
     def flatMapValues(self, f):
@@ -1180,7 +1181,7 @@ class RDD(object):
 
         >>> x = sc.parallelize([("a", 1), ("b", 4)])
         >>> y = sc.parallelize([("a", 2)])
-        >>> sorted(x.cogroup(y).collect())
+        >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
         [('a', ([1], [2])), ('b', ([4], []))]
         """
         return python_cogroup(self, other, numPartitions)
@@ -1217,7 +1218,7 @@ class RDD(object):
 
         >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
         >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
-        >>> sorted(x.cogroup(y).collect())
+        >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
         [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
         """
         return self.map(lambda x: (f(x), x))
@@ -1317,7 +1318,6 @@ class RDD(object):
     # keys in the pairs.  This could be an expensive operation, since those
     # hashes aren't retained.
 
-
 class PipelinedRDD(RDD):
     """
     Pipelined maps:
diff --git a/python/pyspark/resultiterable.py b/python/pyspark/resultiterable.py
new file mode 100644
index 0000000000000000000000000000000000000000..7f418f8d2e29aa26764d7231cb419992dd1476a0
--- /dev/null
+++ b/python/pyspark/resultiterable.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["ResultIterable"]
+
+import collections
+
+class ResultIterable(collections.Iterable):
+    """
+    A special result iterable. This is used because the standard iterator can not be pickled
+    """
+    def __init__(self, data):
+        self.data = data
+        self.index = 0
+        self.maxindex = len(data)
+    def __iter__(self):
+        return iter(self.data)
+    def __len__(self):
+        return len(self.data)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index ac451d1913aaa38710638c1c2f2005990604a152..2ac943d7bf781989adf5223164d8d808d3e99e8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.api.java
 
-import java.lang.{Long => JLong}
+import java.lang.{Long => JLong, Iterable => JIterable}
 import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
@@ -115,15 +115,15 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
    * generate the RDDs with Spark's default number of partitions.
    */
-  def groupByKey(): JavaPairDStream[K, JList[V]] =
-    dstream.groupByKey().mapValues(seqAsJavaList _)
+  def groupByKey(): JavaPairDStream[K, JIterable[V]] =
+    dstream.groupByKey().mapValues(asJavaIterable _)
 
   /**
    * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
    * generate the RDDs with `numPartitions` partitions.
    */
-  def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
-    dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
+  def groupByKey(numPartitions: Int): JavaPairDStream[K, JIterable[V]] =
+    dstream.groupByKey(numPartitions).mapValues(asJavaIterable _)
 
   /**
    * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
@@ -131,8 +131,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner
    * is used to control the partitioning of each RDD.
    */
-  def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
-    dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
+  def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JIterable[V]] =
+    dstream.groupByKey(partitioner).mapValues(asJavaIterable _)
 
   /**
    * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
@@ -196,8 +196,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    */
-  def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = {
-    dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _)
+  def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JIterable[V]] = {
+    dstream.groupByKeyAndWindow(windowDuration).mapValues(asJavaIterable _)
   }
 
   /**
@@ -211,8 +211,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    *                       DStream's batching interval
    */
   def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
-  : JavaPairDStream[K, JList[V]] = {
-    dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
+  : JavaPairDStream[K, JIterable[V]] = {
+    dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(asJavaIterable _)
   }
 
   /**
@@ -227,9 +227,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * @param numPartitions  Number of partitions of each RDD in the new DStream.
    */
   def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
-  :JavaPairDStream[K, JList[V]] = {
+  :JavaPairDStream[K, JIterable[V]] = {
     dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
-      .mapValues(seqAsJavaList _)
+      .mapValues(asJavaIterable _)
   }
 
   /**
@@ -247,9 +247,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
       windowDuration: Duration,
       slideDuration: Duration,
       partitioner: Partitioner
-    ):JavaPairDStream[K, JList[V]] = {
+    ):JavaPairDStream[K, JIterable[V]] = {
     dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
-      .mapValues(seqAsJavaList _)
+      .mapValues(asJavaIterable _)
   }
 
   /**
@@ -518,9 +518,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
    * Hash partitioning is used to generate the RDDs with Spark's default number
    * of partitions.
    */
-  def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
+  def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
     implicit val cm: ClassTag[W] = fakeClassTag
-    dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+    dstream.cogroup(other.dstream).mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
   }
 
   /**
@@ -530,10 +530,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   def cogroup[W](
       other: JavaPairDStream[K, W],
       numPartitions: Int
-    ): JavaPairDStream[K, (JList[V], JList[W])] = {
+    ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
     implicit val cm: ClassTag[W] = fakeClassTag
     dstream.cogroup(other.dstream, numPartitions)
-           .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+           .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
   }
 
   /**
@@ -543,10 +543,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   def cogroup[W](
       other: JavaPairDStream[K, W],
       partitioner: Partitioner
-    ): JavaPairDStream[K, (JList[V], JList[W])] = {
+    ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
     implicit val cm: ClassTag[W] = fakeClassTag
     dstream.cogroup(other.dstream, partitioner)
-           .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+           .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
   }
 
   /**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 24734969493605b8c69b121183321282ca8eae2b..354bc132dcdc0f752be77efa89248140f19ba2b6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -51,7 +51,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
    * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
    * generate the RDDs with Spark's default number of partitions.
    */
-  def groupByKey(): DStream[(K, Seq[V])] = {
+  def groupByKey(): DStream[(K, Iterable[V])] = {
     groupByKey(defaultPartitioner())
   }
 
@@ -59,7 +59,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
    * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
    * generate the RDDs with `numPartitions` partitions.
    */
-  def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
+  def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = {
     groupByKey(defaultPartitioner(numPartitions))
   }
 
@@ -67,12 +67,12 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
    * Return a new DStream by applying `groupByKey` on each RDD. The supplied
    * org.apache.spark.Partitioner is used to control the partitioning of each RDD.
    */
-  def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
+  def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = {
     val createCombiner = (v: V) => ArrayBuffer[V](v)
     val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
     val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
     combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
-      .asInstanceOf[DStream[(K, Seq[V])]]
+      .asInstanceOf[DStream[(K, Iterable[V])]]
   }
 
   /**
@@ -126,7 +126,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
    * @param windowDuration width of the window; must be a multiple of this DStream's
    *                       batching interval
    */
-  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
+  def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = {
     groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
   }
 
@@ -140,7 +140,8 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
    *                       the new DStream will generate RDDs); must be a multiple of this
    *                       DStream's batching interval
    */
-  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] =
+  def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
+      : DStream[(K, Iterable[V])] =
   {
     groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
   }
@@ -161,7 +162,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
       windowDuration: Duration,
       slideDuration: Duration,
       numPartitions: Int
-    ): DStream[(K, Seq[V])] = {
+    ): DStream[(K, Iterable[V])] = {
     groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
   }
 
@@ -180,14 +181,14 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
       windowDuration: Duration,
       slideDuration: Duration,
       partitioner: Partitioner
-    ): DStream[(K, Seq[V])] = {
-    val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
-    val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
+    ): DStream[(K, Iterable[V])] = {
+    val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v
+    val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v
     val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
     self.groupByKey(partitioner)
         .window(windowDuration, slideDuration)
         .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
-        .asInstanceOf[DStream[(K, Seq[V])]]
+        .asInstanceOf[DStream[(K, Iterable[V])]]
   }
 
   /**
@@ -438,7 +439,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
    * Hash partitioning is used to generate the RDDs with Spark's default number
    * of partitions.
    */
-  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+  def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = {
     cogroup(other, defaultPartitioner())
   }
 
@@ -447,7 +448,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
    * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
    */
   def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
-  : DStream[(K, (Seq[V], Seq[W]))] = {
+  : DStream[(K, (Iterable[V], Iterable[W]))] = {
     cogroup(other, defaultPartitioner(numPartitions))
   }
 
@@ -458,7 +459,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
   def cogroup[W: ClassTag](
       other: DStream[(K, W)],
       partitioner: Partitioner
-    ): DStream[(K, (Seq[V], Seq[W]))] = {
+    ): DStream[(K, (Iterable[V], Iterable[W]))] = {
     self.transformWith(
       other,
       (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 5f7d3ba26c65694aeb5a7fb640e1b6cd9fd19291..7e22268767de7a8fa8bcf5f4c0a2c096911d695f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -56,9 +56,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
             // first map the cogrouped tuple to tuples of required type,
             // and then apply the update function
             val updateFuncLocal = updateFunc
-            val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
+            val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
               val i = iterator.map(t => {
-                (t._1, t._2._1, t._2._2.headOption)
+                val itr = t._2._2.iterator
+                val headOption = itr.hasNext match {
+                  case true => Some(itr.next())
+                  case false => None
+                }
+                (t._1, t._2._1.toSeq, headOption)
               })
               updateFuncLocal(i)
             }
@@ -90,8 +95,8 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
             // first map the grouped tuple to tuples of required type,
             // and then apply the update function
             val updateFuncLocal = updateFunc
-            val finalFunc = (iterator: Iterator[(K, Seq[V])]) => {
-              updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None)))
+            val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
+              updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None)))
             }
 
             val groupedRDD = parentRDD.groupByKey(partitioner)
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index e93bf18b6d0b96668ef6adbf7c5a967cabcb5a6b..13fa64894b77338e83c93dffddc2122737da5791 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -23,6 +23,7 @@ import org.junit.Assert;
 import org.junit.Test;
 import java.io.*;
 import java.util.*;
+import java.lang.Iterable;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
@@ -45,6 +46,18 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
 // see http://stackoverflow.com/questions/758570/.
 public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
 
+  public void equalIterator(Iterator<?> a, Iterator<?> b) {
+    while (a.hasNext() && b.hasNext()) {
+      Assert.assertEquals(a.next(), b.next());
+    }
+    Assert.assertEquals(a.hasNext(), b.hasNext());
+  }
+
+  public void equalIterable(Iterable<?> a, Iterable<?> b) {
+      equalIterator(a.iterator(), b.iterator());
+  }
+
+
   @SuppressWarnings("unchecked")
   @Test
   public void testCount() {
@@ -1016,11 +1029,24 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey();
+    JavaPairDStream<String, Iterable<String>> grouped = pairStream.groupByKey();
     JavaTestUtils.attachTestOutputStream(grouped);
-    List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
+    List<List<Tuple2<String, Iterable<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected.size(), result.size());
+    Iterator<List<Tuple2<String, Iterable<String>>>> resultItr = result.iterator();
+    Iterator<List<Tuple2<String, List<String>>>> expectedItr = expected.iterator();
+    while (resultItr.hasNext() && expectedItr.hasNext()) {
+      Iterator<Tuple2<String, Iterable<String>>> resultElements = resultItr.next().iterator();
+      Iterator<Tuple2<String, List<String>>> expectedElements = expectedItr.next().iterator();
+      while (resultElements.hasNext() && expectedElements.hasNext()) {
+        Tuple2<String, Iterable<String>> resultElement = resultElements.next();
+        Tuple2<String, List<String>> expectedElement = expectedElements.next();
+        Assert.assertEquals(expectedElement._1(), resultElement._1());
+        equalIterable(expectedElement._2(), resultElement._2());
+      }
+      Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -1128,7 +1154,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
     JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
     JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
 
-    JavaPairDStream<String, List<Integer>> groupWindowed =
+    JavaPairDStream<String, Iterable<Integer>> groupWindowed =
         pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
     JavaTestUtils.attachTestOutputStream(groupWindowed);
     List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1471,11 +1497,25 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
         ssc, stringStringKVStream2, 1);
     JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
 
-    JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
+    JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> grouped = pairStream1.cogroup(pairStream2);
     JavaTestUtils.attachTestOutputStream(grouped);
-    List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
-    Assert.assertEquals(expected, result);
+    List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+    Assert.assertEquals(expected.size(), result.size());
+    Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> resultItr = result.iterator();
+    Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expectedItr = expected.iterator();
+    while (resultItr.hasNext() && expectedItr.hasNext()) {
+      Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> resultElements = resultItr.next().iterator();
+      Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> expectedElements = expectedItr.next().iterator();
+      while (resultElements.hasNext() && expectedElements.hasNext()) {
+        Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> resultElement = resultElements.next();
+        Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement = expectedElements.next();
+        Assert.assertEquals(expectedElement._1(), resultElement._1());
+        equalIterable(expectedElement._2()._1(), resultElement._2()._1());
+        equalIterable(expectedElement._2()._2(), resultElement._2()._2());
+      }
+      Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
+    }
   }
 
   @SuppressWarnings("unchecked")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index bb73dbf29b64990a28591418a4bcb7d91fe30247..8aec27e39478a185ad45a8678239bff66734a7cc 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -117,7 +117,7 @@ class BasicOperationsSuite extends TestSuiteBase {
   test("groupByKey") {
     testOperation(
       Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
-      (s: DStream[String]) => s.map(x => (x, 1)).groupByKey(),
+      (s: DStream[String]) => s.map(x => (x, 1)).groupByKey().mapValues(_.toSeq),
       Seq( Seq(("a", Seq(1, 1)), ("b", Seq(1))), Seq(("", Seq(1, 1))), Seq() ),
       true
     )
@@ -251,7 +251,7 @@ class BasicOperationsSuite extends TestSuiteBase {
       Seq(  )
     )
     val operation = (s1: DStream[String], s2: DStream[String]) => {
-      s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x")))
+      s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq))
     }
     testOperation(inputData1, inputData2, operation, outputData, true)
   }