From 5d01ebca3c7dcf9695c9abb5a7690b547ed83cfe Mon Sep 17 00:00:00 2001
From: "Joseph E. Gonzalez" <joseph.e.gonzalez@gmail.com>
Date: Fri, 18 Oct 2013 18:45:10 -0700
Subject: [PATCH] Specializing IndexedRDD as VertexSetRDD.

1) This allows the index map to be optimized for Vids
2) This makes the code more readable
2) The Graph API can now return VertexSetRDDs from operations that produce results for vertices
---
 .../scala/org/apache/spark/graph/Graph.scala  |   4 +-
 .../org/apache/spark/graph/GraphOps.scala     |  14 +-
 .../{IndexedRDD.scala => VertexSetRDD.scala}  | 167 ++++++------
 .../apache/spark/graph/impl/GraphImpl.scala   | 238 +++++++++---------
 4 files changed, 213 insertions(+), 210 deletions(-)
 rename graph/src/main/scala/org/apache/spark/graph/{IndexedRDD.scala => VertexSetRDD.scala} (85%)

diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index 68f0394dd4..8c7ee1fcef 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -28,7 +28,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
    * @see Vertex for the vertex type.
    *
    */
-  val vertices: RDD[(Vid,VD)]
+  val vertices: VertexSetRDD[VD]
 
   /**
    * Get the Edges and their data as an RDD.  The entries in the RDD contain
@@ -257,7 +257,7 @@ abstract class Graph[VD: ClassManifest, ED: ClassManifest] {
   def mapReduceTriplets[A: ClassManifest](
       mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
       reduceFunc: (A, A) => A)
-    : RDD[(Vid, A)] 
+    : VertexSetRDD[A] 
 
 
   /**
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
index 5e8f082fda..cecd3ff291 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
@@ -13,11 +13,11 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
 
   lazy val numVertices: Long = graph.vertices.count()
 
-  lazy val inDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.In)
+  lazy val inDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.In)
 
-  lazy val outDegrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Out)
+  lazy val outDegrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Out)
 
-  lazy val degrees: RDD[(Vid, Int)] = degreesRDD(EdgeDirection.Both)
+  lazy val degrees: VertexSetRDD[Int] = degreesRDD(EdgeDirection.Both)
 
 
   /**
@@ -62,7 +62,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
       mapFunc: (Vid, EdgeTriplet[VD, ED]) => Option[A],
       reduceFunc: (A, A) => A,
       dir: EdgeDirection)
-    : RDD[(Vid, A)] = {
+    : VertexSetRDD[A] = {
 
     ClosureCleaner.clean(mapFunc)
     ClosureCleaner.clean(reduceFunc)
@@ -94,20 +94,20 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
   } // end of aggregateNeighbors
 
 
-  def collectNeighborIds(edgeDirection: EdgeDirection) : RDD[(Vid, Array[Vid])] = {
+  def collectNeighborIds(edgeDirection: EdgeDirection) : VertexSetRDD[Array[Vid]] = {
     val nbrs = graph.aggregateNeighbors[Array[Vid]](
       (vid, edge) => Some(Array(edge.otherVertexId(vid))),
       (a, b) => a ++ b,
       edgeDirection)
 
-    graph.vertices.leftOuterJoin(nbrs).mapValues{
+    graph.vertices.leftZipJoin(nbrs).mapValues{
       case (_, Some(nbrs)) => nbrs
       case (_, None) => Array.empty[Vid]
     }
   }
 
 
-  private def degreesRDD(edgeDirection: EdgeDirection): RDD[(Vid, Int)] = {
+  private def degreesRDD(edgeDirection: EdgeDirection): VertexSetRDD[Int] = {
     graph.aggregateNeighbors((vid, edge) => Some(1), _+_, edgeDirection)
   }
 
diff --git a/graph/src/main/scala/org/apache/spark/graph/IndexedRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
similarity index 85%
rename from graph/src/main/scala/org/apache/spark/graph/IndexedRDD.scala
rename to graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
index 900a46bb42..b3f1fa768c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/IndexedRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
@@ -39,47 +39,40 @@ import org.apache.spark.storage.StorageLevel
 
 
 
-/**
- * The BlockIndex is the internal map structure used inside the index 
- * of the IndexedRDD.
- */
-class BlockIndex[@specialized K: ClassManifest] extends JHashMap[K,Int]
-
 
 /**
- * The RDDIndex is an opaque type used to represent the organization 
+ * The VertexSetIndex is an opaque type used to represent the organization 
  * of values in an RDD
  */
-class RDDIndex[@specialized K: ClassManifest](private[spark] val rdd: RDD[BlockIndex[K]]) {
-  def persist(newLevel: StorageLevel): RDDIndex[K] = {
+class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
+  def persist(newLevel: StorageLevel): VertexSetIndex = {
     rdd.persist(newLevel)
     return this
   }
-
   def partitioner: Partitioner = rdd.partitioner.get
 }
 
 
 
 /**
- * An IndexedRDD[K,V] extends the RDD[(K,V)] by pre-indexing the keys and 
+ * An VertexSetRDD[V] extends the RDD[(Vid,V)] by pre-indexing the keys and 
  * organizing the values to enable faster join operations.
  *
- * In addition to providing the basic RDD[(K,V)] functionality the IndexedRDD
- * exposes an index member which can be used to "key" other IndexedRDDs
+ * In addition to providing the basic RDD[(Vid,V)] functionality the VertexSetRDD
+ * exposes an index member which can be used to "key" other VertexSetRDDs
  * 
  */
-class IndexedRDD[K: ClassManifest, V: ClassManifest](
-    @transient val index:  RDDIndex[K],
+class VertexSetRDD[V: ClassManifest](
+    @transient val index:  VertexSetIndex,
     @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
-  extends RDD[(K, V)](index.rdd.context, 
+  extends RDD[(Vid, V)](index.rdd.context, 
     List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
 
 
   /**
-   * Construct a new IndexedRDD that is indexed by only the keys in the RDD
+   * Construct a new VertexSetRDD that is indexed by only the keys in the RDD
    */
-   def reindex(): IndexedRDD[K,V] = IndexedRDD(this)
+   def reindex(): VertexSetRDD[V] = VertexSetRDD(this)
 
 
   /**
@@ -109,20 +102,26 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
 
 
   /**
-   * Caching an IndexedRDD causes the index and values to be cached separately. 
+   * Caching an VertexSetRDD causes the index and values to be cached separately. 
    */
-  override def persist(newLevel: StorageLevel): RDD[(K,V)] = {
+  override def persist(newLevel: StorageLevel): VertexSetRDD[V] = {
     index.persist(newLevel)
     valuesRDD.persist(newLevel)
     return this
   }
 
+  override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY)
+
+  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+  override def cache(): VertexSetRDD[V] = persist()
+
+
 
   /**
    * Pass each value in the key-value pair RDD through a map function without changing the keys;
    * this also retains the original RDD's partitioning.
    */
-  def mapValues[U: ClassManifest](f: V => U): IndexedRDD[K, U] = {
+  def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
     val cleanF = index.rdd.context.clean(f)
     val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = 
     valuesRDD.mapPartitions(iter => iter.map{ 
@@ -133,7 +132,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
         }
         (newValues.toIndexedSeq, bs)
       }, preservesPartitioning = true)
-    new IndexedRDD[K,U](index, newValuesRDD)
+    new VertexSetRDD[U](index, newValuesRDD)
   }
 
 
@@ -141,7 +140,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
    * Pass each value in the key-value pair RDD through a map function without changing the keys;
    * this also retains the original RDD's partitioning.
    */
-  def mapValuesWithKeys[U: ClassManifest](f: (K, V) => U): IndexedRDD[K, U] = {
+  def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
     val cleanF = index.rdd.context.clean(f)
     val newValues: RDD[ (IndexedSeq[U], BitSet) ] = 
       index.rdd.zipPartitions(valuesRDD){ 
@@ -158,11 +157,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
       }
       Array((newValues.toIndexedSeq, bs)).iterator
     }
-    new IndexedRDD[K,U](index, newValues)
+    new VertexSetRDD[U](index, newValues)
   }
 
 
-  def zipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,W)] = {
+  def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = {
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
     }
@@ -176,11 +175,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
       val newValues = thisValues.view.zip(otherValues)
       Iterator((newValues.toIndexedSeq, newBS))
     }
-    new IndexedRDD(index, newValuesRDD)
+    new VertexSetRDD(index, newValuesRDD)
   }
 
 
-  def leftZipJoin[W: ClassManifest](other: IndexedRDD[K,W]): IndexedRDD[K,(V,Option[W])] = {
+  def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = {
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
     }
@@ -195,18 +194,18 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
       val newValues = thisValues.view.zip(otherOption)
       Iterator((newValues.toIndexedSeq, thisBS))
     }
-    new IndexedRDD(index, newValuesRDD)
+    new VertexSetRDD(index, newValuesRDD)
   }
 
 
 
   def leftJoin[W: ClassManifest](
-    other: RDD[(K,W)], merge: (W,W) => W = (a:W, b:W) => a):
-    IndexedRDD[K, (V, Option[W]) ] = {
+    other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a):
+    VertexSetRDD[(V, Option[W]) ] = {
     val cleanMerge = index.rdd.context.clean(merge)
 
     other match {
-      case other: IndexedRDD[_, _] if index == other.index => {
+      case other: VertexSetRDD[_] if index == other.index => {
         leftZipJoin(other)
       }    
       case _ => {
@@ -247,21 +246,21 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
 
           Iterator((newValues.toIndexedSeq, thisBS))
         } // end of newValues
-        new IndexedRDD(index, newValues) 
+        new VertexSetRDD(index, newValues) 
       }
     }
   }
 
 
-/**
+  /**
    * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
    * list of values for that key in `this` as well as `other`.
    */
-  def cogroup[W: ClassManifest](other: RDD[(K, W)], partitioner: Partitioner): 
-  IndexedRDD[K, (Seq[V], Seq[W])] = {
+  def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): 
+  VertexSetRDD[(Seq[V], Seq[W])] = {
     //RDD[(K, (Seq[V], Seq[W]))] = {
     other match {
-      case other: IndexedRDD[_, _] if index == other.index => {
+      case other: VertexSetRDD[_] if index == other.index => {
         // if both RDDs share exactly the same index and therefore the same super set of keys
         // then we simply merge the value RDDs. 
         // However it is possible that both RDDs are missing a value for a given key in 
@@ -284,9 +283,9 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
             }
             Iterator((newValues.toIndexedSeq, newBS))
         }
-        new IndexedRDD(index, newValues) 
+        new VertexSetRDD(index, newValues) 
       }
-      case other: IndexedRDD[_, _] 
+      case other: VertexSetRDD[_] 
         if index.rdd.partitioner == other.index.rdd.partitioner => {
         // If both RDDs are indexed using different indices but with the same partitioners
         // then we we need to first merge the indicies and then use the merged index to
@@ -298,7 +297,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
             assert(!thisIter.hasNext)
             val otherIndex = otherIter.next()
             assert(!otherIter.hasNext)
-            val newIndex = new BlockIndex[K]()
+            val newIndex = new VertexIdToIndexMap()
             // @todo Merge only the keys that correspond to non-null values
             // Merge the keys
             newIndex.putAll(thisIndex)
@@ -318,7 +317,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
               // Get the new index for this partition
               val newIndex = newIndexIter.next()
               assert(!newIndexIter.hasNext)
-              // Get the corresponding indicies and values for this and the other IndexedRDD
+              // Get the corresponding indicies and values for this and the other VertexSetRDD
               val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
               assert(!thisTuplesIter.hasNext)
               val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next()
@@ -347,7 +346,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
               }
               Iterator((newValues.toIndexedSeq, newBS))
             })
-        new IndexedRDD(new RDDIndex(newIndex), newValues)
+        new VertexSetRDD(new VertexSetIndex(newIndex), newValues)
       }
       case _ => {
         // Get the partitioner from the index
@@ -360,20 +359,20 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
           if (other.partitioner == Some(partitioner)) {
             other
           } else {
-            new ShuffledRDD[K, W, (K,W)](other, partitioner)
+            other.partitionBy(partitioner)
           }
         // Join the other RDD with this RDD building a new valueset and new index on the fly
         val groups = tuples.zipPartitions(otherShuffled)(
           (thisTuplesIter, otherTuplesIter) => {
-            // Get the corresponding indicies and values for this IndexedRDD
+            // Get the corresponding indicies and values for this VertexSetRDD
             val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
             assert(!thisTuplesIter.hasNext())
             // Construct a new index
-            val newIndex = thisIndex.clone().asInstanceOf[BlockIndex[K]]
+            val newIndex = thisIndex.clone().asInstanceOf[VertexIdToIndexMap]
             // Construct a new array Buffer to store the values
             val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null)
             val newBS = new BitSet(thisValues.size)
-            // populate the newValues with the values in this IndexedRDD
+            // populate the newValues with the values in this VertexSetRDD
             for ((k,i) <- thisIndex) {
               if (thisBS(i)) {
                 newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W]) 
@@ -415,14 +414,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
         val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = 
           groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
           
-        new IndexedRDD[K, (Seq[V], Seq[W])](new RDDIndex(newIndex), newValues)
+        new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues)
       }
     }
   }
 
 
   // 
-  // def zipJoinToRDD[W: ClassManifest](other: IndexedRDD[K,W]): RDD[(K,(V,W))] = {
+  // def zipJoinToRDD[W: ClassManifest](other: VertexSetRDD[K,W]): RDD[(K,(V,W))] = {
   //   if(index != other.index) {
   //     throw new SparkException("ZipJoinRDD can only be applied to RDDs with the same index!")
   //   }
@@ -447,11 +446,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
     other: RDD[(K,W)])(
     f: (K, V, W) => Z, 
     merge: (Z,Z) => Z = (a:Z, b:Z) => a):
-    IndexedRDD[K,Z] = {
+    VertexSetRDD[K,Z] = {
     val cleanF = index.rdd.context.clean(f)
     val cleanMerge = index.rdd.context.clean(merge)
     other match {
-      case other: IndexedRDD[_, _] if index == other.index => {
+      case other: VertexSetRDD[_, _] if index == other.index => {
         val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
           (thisIndexIter, thisIter, otherIter) => 
           val index = thisIndexIter.next()
@@ -469,7 +468,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
           }
           List((newValues, newBS)).iterator
         }
-        new IndexedRDD(index, newValues) 
+        new VertexSetRDD(index, newValues) 
       }
     
       case _ => {
@@ -508,7 +507,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
           } 
           List((newValues, tempBS)).iterator
         } // end of newValues
-        new IndexedRDD(index, newValues) 
+        new VertexSetRDD(index, newValues) 
       }
     }
   }
@@ -519,11 +518,11 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
     other: RDD[(K,W)])(
     f: (K, V, Option[W]) => Z, 
     merge: (Z,Z) => Z = (a:Z, b:Z) => a):
-    IndexedRDD[K,Z] = {
+    VertexSetRDD[K,Z] = {
     val cleanF = index.rdd.context.clean(f)
     val cleanMerge = index.rdd.context.clean(merge)
     other match {
-      case other: IndexedRDD[_, _] if index == other.index => {
+      case other: VertexSetRDD[_, _] if index == other.index => {
         val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
           (thisIndexIter, thisIter, otherIter) => 
           val index = thisIndexIter.next()
@@ -541,7 +540,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
           }
           List((newValues, thisBS)).iterator
         }
-        new IndexedRDD(index, newValues) 
+        new VertexSetRDD(index, newValues) 
       }
     
       case _ => {
@@ -584,7 +583,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
           }
           List((newValues, thisBS)).iterator
         } // end of newValues
-        new IndexedRDD(index, newValues) 
+        new VertexSetRDD(index, newValues) 
       }
     }
   }
@@ -593,7 +592,7 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
 
 
 
-  override def filter(f: Tuple2[K,V] => Boolean): RDD[(K,V)] = {
+  override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
     val cleanF = index.rdd.context.clean(f)
     val newValues = index.rdd.zipPartitions(valuesRDD){ 
       (keysIter, valuesIter) => 
@@ -609,14 +608,14 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
       }
       Array((oldValues, newBS)).iterator
     }
-    new IndexedRDD[K,V](index, newValues)
+    new VertexSetRDD[V](index, newValues)
   }
 
 
   /**
    * Provide the RDD[(K,V)] equivalent output. 
    */
-  override def compute(part: Partition, context: TaskContext): Iterator[(K, V)] = {
+  override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = {
     tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => 
       // Walk the index to construct the key, value pairs
       indexMap.iterator 
@@ -629,27 +628,27 @@ class IndexedRDD[K: ClassManifest, V: ClassManifest](
     }
   }
 
-} // End of IndexedRDD
+} // End of VertexSetRDD
 
 
 
 
-object IndexedRDD {
+object VertexSetRDD {
 
 
-  def apply[K: ClassManifest, V: ClassManifest](rdd: RDD[(K,V)]): IndexedRDD[K,V] = 
+  def apply[V: ClassManifest](rdd: RDD[(Vid,V)]): VertexSetRDD[V] = 
     apply(rdd, (a:V, b:V) => a )
 
-  def apply[K: ClassManifest, V: ClassManifest](
-    rdd: RDD[(K,V)], reduceFunc: (V, V) => V): IndexedRDD[K,V] = {
+  def apply[V: ClassManifest](
+    rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = {
     // Preaggregate and shuffle if necessary
     // Preaggregation.
-    val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc)
+    val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc)
     val partitioner = new HashPartitioner(rdd.partitions.size)
     val preAgg = rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
 
     val groups = preAgg.mapPartitions( iter => {
-      val indexMap = new BlockIndex[K]()
+      val indexMap = new VertexIdToIndexMap()
       val values = new ArrayBuffer[V]
       val bs = new BitSet
       for ((k,v) <- iter) {
@@ -669,19 +668,19 @@ object IndexedRDD {
     val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
     val values: RDD[(IndexedSeq[V], BitSet)] = 
       groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
-    new IndexedRDD[K,V](new RDDIndex(index), values)
+    new VertexSetRDD[V](new VertexSetIndex(index), values)
   }
 
 
 
-  def apply[K: ClassManifest, V: ClassManifest](
-    rdd: RDD[(K,V)], index: RDDIndex[K]): IndexedRDD[K,V] = 
+  def apply[V: ClassManifest](
+    rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = 
     apply(rdd, index, (a:V,b:V) => a)
 
 
-  def apply[K: ClassManifest, V: ClassManifest](
-    rdd: RDD[(K,V)], index: RDDIndex[K], 
-    reduceFunc: (V, V) => V): IndexedRDD[K,V] = 
+  def apply[V: ClassManifest](
+    rdd: RDD[(Vid,V)], index: VertexSetIndex,
+    reduceFunc: (V, V) => V): VertexSetRDD[V] = 
     apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
   // {
   //   // Get the index Partitioner
@@ -721,16 +720,16 @@ object IndexedRDD {
   //     }
   //     List((values, bs)).iterator
   //   })
-  //   new IndexedRDD[K,V](index, values)
+  //   new VertexSetRDD[K,V](index, values)
   // } // end of apply
 
 
-  def apply[K: ClassManifest, V: ClassManifest, C: ClassManifest](
-    rdd: RDD[(K,V)], 
-    index: RDDIndex[K],
+  def apply[V: ClassManifest, C: ClassManifest](
+    rdd: RDD[(Vid,V)], 
+    index: VertexSetIndex,
     createCombiner: V => C,
     mergeValue: (C, V) => C,
-    mergeCombiners: (C, C) => C): IndexedRDD[K,C] = {
+    mergeCombiners: (C, C) => C): VertexSetRDD[C] = {
     // Get the index Partitioner
     val partitioner = index.rdd.partitioner match {
       case Some(p) => p
@@ -740,7 +739,7 @@ object IndexedRDD {
     val partitioned = 
       if (rdd.partitioner != Some(partitioner)) {
         // Preaggregation.
-        val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, 
+        val aggregator = new Aggregator[Vid, V, C](createCombiner, mergeValue, 
           mergeCombiners)
         rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
       } else {
@@ -769,15 +768,15 @@ object IndexedRDD {
       }
       Iterator((values, bs))
     })
-    new IndexedRDD(index, values)
+    new VertexSetRDD(index, values)
   } // end of apply
 
 
   /**
    * Construct and index of the unique values in a given RDD.
    */
-  def makeIndex[K: ClassManifest](keys: RDD[K], 
-    partitioner: Option[Partitioner] = None): RDDIndex[K] = {
+  def makeIndex(keys: RDD[Vid], 
+    partitioner: Option[Partitioner] = None): VertexSetIndex = {
     // @todo: I don't need the boolean its only there to be the second type since I want to shuffle a single RDD
     // Ugly hack :-(.  In order to partition the keys they must have values. 
     val tbl = keys.mapPartitions(_.map(k => (k, false)), true)
@@ -786,7 +785,7 @@ object IndexedRDD {
       case None =>  {
         if (tbl.partitioner.isEmpty) {
           // @todo: I don't need the boolean its only there to be the second type of the shuffle. 
-          new ShuffledRDD[K, Boolean, (K, Boolean)](tbl, Partitioner.defaultPartitioner(tbl))
+          new ShuffledRDD[Vid, Boolean, (Vid, Boolean)](tbl, Partitioner.defaultPartitioner(tbl))
         } else { tbl }
       }
       case Some(partitioner) => 
@@ -794,7 +793,7 @@ object IndexedRDD {
     }
 
     val index = shuffledTbl.mapPartitions( iter => {
-      val indexMap = new BlockIndex[K]()
+      val indexMap = new VertexIdToIndexMap()
       for ( (k,_) <- iter ){
         if(!indexMap.contains(k)){
           val ind = indexMap.size
@@ -803,10 +802,10 @@ object IndexedRDD {
       }
       Iterator(indexMap)
       }, true).cache
-    new RDDIndex(index)
+    new VertexSetIndex(index)
   }
 
-} // end of object IndexedRDD
+} // end of object VertexSetRDD
 
 
 
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index c6875f0c9c..bdf79bf9f0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -66,17 +66,16 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
 
 object EdgeTripletBuilder {
   def makeTriplets[VD: ClassManifest, ED: ClassManifest]( 
-    localVidMap: IndexedRDD[Pid, VertexIdToIndexMap],
-    vTableReplicatedValues: IndexedRDD[Pid, Array[VD]],
-    eTable: IndexedRDD[Pid, EdgePartition[ED]]): RDD[EdgeTriplet[VD, ED]] = {
-    val iterFun = (iter: Iterator[(Pid, ((VertexIdToIndexMap, Array[VD]), EdgePartition[ED]))]) => {
-      val (pid, ((vidToIndex, vertexArray), edgePartition)) = iter.next()
-      assert(iter.hasNext == false)
+    localVidMap: RDD[(Pid, VertexIdToIndexMap)],
+    vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
+    eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
+    localVidMap.zipPartitions(vTableReplicatedValues, eTable) {
+      (vidMapIter, replicatedValuesIter, eTableIter) =>
+      val (_, vidToIndex) = vidMapIter.next()
+      val (_, vertexArray) = replicatedValuesIter.next()
+      val (_, edgePartition) = eTableIter.next()
       new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
     }
-    ClosureCleaner.clean(iterFun) 
-    localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable)
-      .mapPartitions( iterFun ) // end of map partition
   }
 }
 
@@ -100,30 +99,30 @@ object EdgeTripletBuilder {
  * A Graph RDD that supports computation on graphs.
  */
 class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
-    @transient val vTable: IndexedRDD[Vid, VD],
-    @transient val vid2pid: IndexedRDD[Vid, Array[Pid]],
-    @transient val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap],
-    @transient val eTable: IndexedRDD[Pid, EdgePartition[ED]])
+    @transient val vTable: VertexSetRDD[VD],
+    @transient val vid2pid: VertexSetRDD[Array[Pid]],
+    @transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)],
+    @transient val eTable: RDD[(Pid, EdgePartition[ED])] )
   extends Graph[VD, ED] {
 
 //  def this() = this(null,null,null)
 
 
   /**
-   * (localVidMap: IndexedRDD[Pid, VertexIdToIndexMap]) is a version of the
+   * (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
    * vertex data after it is replicated. Within each partition, it holds a map
    * from vertex ID to the index where that vertex's attribute is stored. This
    * index refers to an array in the same partition in vTableReplicatedValues.
    *
-   * (vTableReplicatedValues: IndexedRDD[Pid, Array[VD]]) holds the vertex data
+   * (vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]]) holds the vertex data
    * and is arranged as described above.
    */
-  @transient val vTableReplicatedValues =
+  @transient val vTableReplicatedValues: RDD[(Pid, Array[VD])] =
     createVTableReplicated(vTable, vid2pid, localVidMap)
 
 
   /** Return a RDD of vertices. */
-  @transient override val vertices: RDD[(Vid, VD)] = vTable
+  @transient override val vertices = vTable
 
 
   /** Return a RDD of edges. */
@@ -177,36 +176,40 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
 
 
   override def reverse: Graph[VD, ED] = {
-    val etable = eTable.mapValues( _.reverse ).asInstanceOf[IndexedRDD[Pid, EdgePartition[ED]]] 
-    new GraphImpl(vTable, vid2pid, localVidMap, etable)
+    val newEtable = eTable.mapPartitions( _.map{ case (pid, epart) => (pid, epart.reverse) }, 
+      preservesPartitioning = true)
+    new GraphImpl(vTable, vid2pid, localVidMap, newEtable)
   }
 
 
   override def mapVertices[VD2: ClassManifest](f: (Vid, VD) => VD2): Graph[VD2, ED] = {
     val newVTable = vTable.mapValuesWithKeys((vid, data) => f(vid, data))
-      .asInstanceOf[IndexedRDD[Vid, VD2]]
     new GraphImpl(newVTable, vid2pid, localVidMap, eTable)
   }
 
   override def mapEdges[ED2: ClassManifest](f: Edge[ED] => ED2): Graph[VD, ED2] = {
-    val newETable = eTable.mapValues(eBlock => eBlock.map(f))
-      .asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
+    val newETable = eTable.mapPartitions(_.map{ case (pid, epart) => (pid, epart.map(f)) },
+      preservesPartitioning = true)
     new GraphImpl(vTable, vid2pid, localVidMap, newETable)
   }
 
 
   override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2):
     Graph[VD, ED2] = {
-    val newETable = eTable.zipJoin(localVidMap).zipJoin(vTableReplicatedValues).mapValues{ 
-      case ((edgePartition, vidToIndex), vertexArray) =>
-        val et = new EdgeTriplet[VD, ED]
-        edgePartition.map{e =>
-          et.set(e)
-          et.srcAttr = vertexArray(vidToIndex(e.srcId))
-          et.dstAttr = vertexArray(vidToIndex(e.dstId))
-          f(et)
-        }
-    }.asInstanceOf[IndexedRDD[Pid, EdgePartition[ED2]]]
+    val newETable = eTable.zipPartitions(localVidMap, vTableReplicatedValues){ 
+      (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
+      val (pid, edgePartition) = edgePartitionIter.next()
+      val (_, vidToIndex) = vidToIndexIter.next()
+      val (_, vertexArray) = vertexArrayIter.next()
+      val et = new EdgeTriplet[VD, ED]
+      val newEdgePartition = edgePartition.map{e =>
+        et.set(e)
+        et.srcAttr = vertexArray(vidToIndex(e.srcId))
+        et.dstAttr = vertexArray(vidToIndex(e.dstId))
+        f(et)
+      }
+      Iterator((pid, newEdgePartition))
+    }
     new GraphImpl(vTable, vid2pid, localVidMap, newETable)
   }
 
@@ -238,7 +241,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
 
     // Reuse the partitioner (but not the index) from this graph
     val newVTable = 
-      IndexedRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner))
+      VertexSetRDD(vertices.filter(v => vpred(v._1, v._2)).partitionBy(vTable.index.partitioner))
 
 
     // Restrict the set of edges to those that satisfy the vertex and the edge predicate.
@@ -309,53 +312,56 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
   override def mapReduceTriplets[A: ClassManifest](
       mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
       reduceFunc: (A, A) => A)
-    : RDD[(Vid, A)] = {
+    : VertexSetRDD[A] = {
 
     ClosureCleaner.clean(mapFunc)
     ClosureCleaner.clean(reduceFunc)
 
     // Map and preaggregate 
-    val preAgg = localVidMap.zipJoin(vTableReplicatedValues).zipJoin(eTable).flatMap{
-      case (pid, ((vidToIndex, vertexArray), edgePartition)) => 
-        // We can reuse the vidToIndex map for aggregation here as well.
-        /** @todo Since this has the downside of not allowing "messages" to arbitrary
-         * vertices we should consider just using a fresh map.
-         */
-        val msgArray = new Array[A](vertexArray.size)
-        val msgBS = new BitSet(vertexArray.size)
-        // Iterate over the partition
-        val et = new EdgeTriplet[VD, ED]
-        edgePartition.foreach{e => 
-          et.set(e)
-          et.srcAttr = vertexArray(vidToIndex(e.srcId))
-          et.dstAttr = vertexArray(vidToIndex(e.dstId))
-          mapFunc(et).foreach{ case (vid, msg) =>
-            // verify that the vid is valid
-            assert(vid == et.srcId || vid == et.dstId)
-            val ind = vidToIndex(vid)
-            // Populate the aggregator map
-            if(msgBS(ind)) {
-              msgArray(ind) = reduceFunc(msgArray(ind), msg)
-            } else { 
-              msgArray(ind) = msg
-              msgBS(ind) = true
-            }
+    val preAgg = eTable.zipPartitions(localVidMap, vTableReplicatedValues){ 
+      (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
+      val (pid, edgePartition) = edgePartitionIter.next()
+      val (_, vidToIndex) = vidToIndexIter.next()
+      val (_, vertexArray) = vertexArrayIter.next()
+      // We can reuse the vidToIndex map for aggregation here as well.
+      /** @todo Since this has the downside of not allowing "messages" to arbitrary
+       * vertices we should consider just using a fresh map.
+       */
+      val msgArray = new Array[A](vertexArray.size)
+      val msgBS = new BitSet(vertexArray.size)
+      // Iterate over the partition
+      val et = new EdgeTriplet[VD, ED]
+      edgePartition.foreach{e => 
+        et.set(e)
+        et.srcAttr = vertexArray(vidToIndex(e.srcId))
+        et.dstAttr = vertexArray(vidToIndex(e.dstId))
+        mapFunc(et).foreach{ case (vid, msg) =>
+          // verify that the vid is valid
+          assert(vid == et.srcId || vid == et.dstId)
+          val ind = vidToIndex(vid)
+          // Populate the aggregator map
+          if(msgBS(ind)) {
+            msgArray(ind) = reduceFunc(msgArray(ind), msg)
+          } else { 
+            msgArray(ind) = msg
+            msgBS(ind) = true
           }
         }
-        // Return the aggregate map
-        vidToIndex.long2IntEntrySet().fastIterator()
-        // Remove the entries that did not receive a message
-        .filter{ entry => msgBS(entry.getValue()) }
-        // Construct the actual pairs
-        .map{ entry => 
-          val vid = entry.getLongKey()
-          val ind = entry.getValue()
-          val msg = msgArray(ind)
-          (vid, msg)
-        }
+      }
+      // Return the aggregate map
+      vidToIndex.long2IntEntrySet().fastIterator()
+      // Remove the entries that did not receive a message
+      .filter{ entry => msgBS(entry.getValue()) }
+      // Construct the actual pairs
+      .map{ entry => 
+        val vid = entry.getLongKey()
+        val ind = entry.getValue()
+        val msg = msgArray(ind)
+        (vid, msg)
+      }
       }.partitionBy(vTable.index.rdd.partitioner.get)
     // do the final reduction reusing the index map
-    IndexedRDD(preAgg, vTable.index, reduceFunc)
+    VertexSetRDD(preAgg, vTable.index, reduceFunc)
   }
 
 
@@ -402,7 +408,7 @@ object GraphImpl {
     defaultVertexAttr: VD,
     mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = {
 
-    val vtable = IndexedRDD(vertices, mergeFunc) 
+    val vtable = VertexSetRDD(vertices, mergeFunc) 
     /** 
      * @todo Verify that there are no edges that contain vertices 
      * that are not in vTable.  This should probably be resolved:
@@ -432,54 +438,54 @@ object GraphImpl {
    * containing all the edges in a partition.
    */
   protected def createETable[ED: ClassManifest](edges: RDD[Edge[ED]])
-    : IndexedRDD[Pid, EdgePartition[ED]] = {
-      // Get the number of partitions
-      val numPartitions = edges.partitions.size
-      val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt 
-    IndexedRDD(edges.map { e =>
-        // Random partitioning based on the source vertex id.
-        // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions)
-        // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
-        val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions)
-        //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
-
-        // Should we be using 3-tuple or an optimized class
-        MessageToPartition(part, (e.srcId, e.dstId, e.attr))
+    : RDD[(Pid, EdgePartition[ED])] = {
+    // Get the number of partitions
+    val numPartitions = edges.partitions.size
+    val ceilSqrt: Pid = math.ceil(math.sqrt(numPartitions)).toInt 
+    edges.map { e =>
+      // Random partitioning based on the source vertex id.
+      // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions)
+      // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
+      val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions)
+      //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
+
+      // Should we be using 3-tuple or an optimized class
+      MessageToPartition(part, (e.srcId, e.dstId, e.attr))
+    }
+    .partitionBy(new HashPartitioner(numPartitions))
+    .mapPartitionsWithIndex( (pid, iter) => {
+      val builder = new EdgePartitionBuilder[ED]
+      iter.foreach { message =>
+        val data = message.data
+        builder.add(data._1, data._2, data._3)
       }
-      .partitionBy(new HashPartitioner(numPartitions))
-      .mapPartitionsWithIndex({ (pid, iter) =>
-        val builder = new EdgePartitionBuilder[ED]
-        iter.foreach { message =>
-          val data = message.data
-          builder.add(data._1, data._2, data._3)
-        }
-        val edgePartition = builder.toEdgePartition
-        Iterator((pid, edgePartition))
-      }, preservesPartitioning = true))
+      val edgePartition = builder.toEdgePartition
+      Iterator((pid, edgePartition))
+    }, preservesPartitioning = true).cache()
   }
 
 
   protected def createVid2Pid[ED: ClassManifest](
-    eTable: IndexedRDD[Pid, EdgePartition[ED]],
-    vTableIndex: RDDIndex[Vid]): IndexedRDD[Vid, Array[Pid]] = {
+    eTable: RDD[(Pid, EdgePartition[ED])],
+    vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = {
     val preAgg = eTable.mapPartitions { iter =>
       val (pid, edgePartition) = iter.next()
       val vSet = new VertexSet
       edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
       vSet.iterator.map { vid => (vid.toLong, pid) }
     }
-    IndexedRDD[Vid, Pid, ArrayBuffer[Pid]](preAgg, vTableIndex, 
+    VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex, 
       (p: Pid) => ArrayBuffer(p),
       (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
       (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
-      .mapValues(a => a.toArray).asInstanceOf[IndexedRDD[Vid, Array[Pid]]]
+      .mapValues(a => a.toArray).cache()
   }
 
 
-  protected def createLocalVidMap[ED: ClassManifest](
-    eTable: IndexedRDD[Pid, EdgePartition[ED]]): IndexedRDD[Pid, VertexIdToIndexMap] = {
-    eTable.mapValues{ epart =>
-      val vidToIndex = new VertexIdToIndexMap()
+  protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]): 
+    RDD[(Pid, VertexIdToIndexMap)] = {
+    eTable.mapPartitions( _.map{ case (pid, epart) =>
+      val vidToIndex = new VertexIdToIndexMap
       var i = 0
       epart.foreach{ e => 
         if(!vidToIndex.contains(e.srcId)) {
@@ -491,16 +497,16 @@ object GraphImpl {
           i += 1
         }
       }
-      vidToIndex
-    }
+      (pid, vidToIndex)
+    }, preservesPartitioning = true).cache()
   }
 
 
   protected def createVTableReplicated[VD: ClassManifest](
-      vTable: IndexedRDD[Vid, VD], 
-      vid2pid: IndexedRDD[Vid, Array[Pid]],
-      replicationMap: IndexedRDD[Pid, VertexIdToIndexMap]): 
-    IndexedRDD[Pid, Array[VD]] = {
+      vTable: VertexSetRDD[VD], 
+      vid2pid: VertexSetRDD[Array[Pid]],
+      replicationMap: RDD[(Pid, VertexIdToIndexMap)]): 
+    RDD[(Pid, Array[VD])] = {
     // Join vid2pid and vTable, generate a shuffle dependency on the joined 
     // result, and get the shuffle id so we can use it on the slave.
     val msgsByPartition = vTable.zipJoin(vid2pid)
@@ -509,9 +515,9 @@ object GraphImpl {
       }
       .partitionBy(replicationMap.partitioner.get).cache()
    
-    val newValuesRDD = replicationMap.valuesRDD.zipPartitions(msgsByPartition){ 
+    replicationMap.zipPartitions(msgsByPartition){ 
       (mapIter, msgsIter) =>
-      val (IndexedSeq(vidToIndex), bs) = mapIter.next()
+      val (pid, vidToIndex) = mapIter.next()
       assert(!mapIter.hasNext)
       // Populate the vertex array using the vidToIndex map
       val vertexArray = new Array[VD](vidToIndex.size)
@@ -519,14 +525,12 @@ object GraphImpl {
         val ind = vidToIndex(msg.data._1)
         vertexArray(ind) = msg.data._2
       }
-      Iterator((IndexedSeq(vertexArray), bs))
-    }
-
-    new IndexedRDD(replicationMap.index, newValuesRDD)
+      Iterator((pid, vertexArray))
+    }.cache()
 
     // @todo assert edge table has partitioner
 
-    // val localVidMap: IndexedRDD[Pid, VertexIdToIndexMap] =
+    // val localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap] =
     //   msgsByPartition.mapPartitionsWithIndex( (pid, iter) => {
     //     val vidToIndex = new VertexIdToIndexMap
     //     var i = 0
@@ -537,7 +541,7 @@ object GraphImpl {
     //     Array((pid, vidToIndex)).iterator
     //   }, preservesPartitioning = true).indexed(eTable.index)
 
-    // val vTableReplicatedValues: IndexedRDD[Pid, Array[VD]] =
+    // val vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]] =
     //   msgsByPartition.mapPartitionsWithIndex( (pid, iter) => {
     //     val vertexArray = ArrayBuilder.make[VD]
     //     for (msg <- iter) {
-- 
GitLab