diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
index b3f1fa768c86dc33865bfa5b834e82eff036385a..cb75da6c211cbd0331019bbd9bea81dc496ca959 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
@@ -41,28 +41,62 @@ import org.apache.spark.storage.StorageLevel
 
 
 /**
- * The VertexSetIndex is an opaque type used to represent the organization 
- * of values in an RDD
+ * The `VertexSetIndex` maintains the per-partition mapping from vertex id
+ * to the corresponding location in the per-partition values array. 
+ * This class is meant to be an opaque type. 
+ * 
  */
 class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
+  /**
+   * The persist function behaves like the standard RDD persist
+   */
   def persist(newLevel: StorageLevel): VertexSetIndex = {
     rdd.persist(newLevel)
     return this
   }
+
+  /**
+   * Returns the partitioner object of the underlying RDD.  This is used
+   * by the VertexSetRDD to partition the values RDD.
+   */
   def partitioner: Partitioner = rdd.partitioner.get
-}
+} // end of VertexSetIndex
 
 
 
 /**
- * An VertexSetRDD[V] extends the RDD[(Vid,V)] by pre-indexing the keys and 
- * organizing the values to enable faster join operations.
+ * An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there is only 
+ * one entry for each vertex and by pre-indexing the entries for fast, efficient
+ * joins.
  *
  * In addition to providing the basic RDD[(Vid,V)] functionality the VertexSetRDD
  * exposes an index member which can be used to "key" other VertexSetRDDs
  * 
+ * @tparam V the vertex attribute associated with each vertex in the set.
+ *
+ * @param index the index which contains the vertex id information and is used
+ * to organize the values in the RDD.
+ * @param valuesRDD the values RDD contains the actual vertex attributes organized
+ * as an array within each partition.  
+ *
+ * To construct a `VertexSetRDD` use the singleton object:
+ *
+ * @example Construct a `VertexSetRDD` from a plain RDD
+ * {{{
+ * // Construct an intial vertex set
+ * val someData: RDD[(Vid, SomeType)] = loadData(someFile)
+ * val vset = VertexSetRDD(someData)
+ * // If there were redundant values in someData we would use a reduceFunc
+ * val vset2 = VertexSetRDD(someData, reduceFunc)
+ * // Finally we can use the VertexSetRDD to index another dataset
+ * val otherData: RDD[(Vid, OtherType)] = loadData(otherFile)
+ * val vset3 = VertexSetRDD(otherData, vset.index)
+ * // Now we can construct very fast joins between the two sets
+ * val vset4: VertexSetRDD[(SomeType, OtherType)] = vset.leftJoin(vset3)
+ * }}}
+ *
  */
-class VertexSetRDD[V: ClassManifest](
+class VertexSetRDD[@specialized V: ClassManifest](
     @transient val index:  VertexSetIndex,
     @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
   extends RDD[(Vid, V)](index.rdd.context, 
@@ -70,20 +104,23 @@ class VertexSetRDD[V: ClassManifest](
 
 
   /**
-   * Construct a new VertexSetRDD that is indexed by only the keys in the RDD
+   * Construct a new VertexSetRDD that is indexed by only the keys in the RDD.
+   * The resulting VertexSet will be based on a different index and can
+   * no longer be quickly joined with this RDD.
    */
    def reindex(): VertexSetRDD[V] = VertexSetRDD(this)
 
 
   /**
    * An internal representation which joins the block indices with the values
+   * This is used by the compute function to emulate RDD[(Vid, V)]
    */
   protected[spark] val tuples = 
     new ZippedRDD(index.rdd.context, index.rdd, valuesRDD)
 
 
   /**
-   * The partitioner is defined by the index
+   * The partitioner is defined by the index.  
    */
   override val partitioner = index.rdd.partitioner
   
@@ -95,7 +132,8 @@ class VertexSetRDD[V: ClassManifest](
   
 
   /**
-   * The preferred locations are computed based on the preferred locations of the tuples.
+   * The preferred locations are computed based on the preferred 
+   * locations of the tuples. 
    */
   override def getPreferredLocations(s: Partition): Seq[String] = 
     tuples.getPreferredLocations(s)
@@ -110,75 +148,170 @@ class VertexSetRDD[V: ClassManifest](
     return this
   }
 
+
+  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
   override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY)
 
+
   /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
   override def cache(): VertexSetRDD[V] = persist()
 
 
-
   /**
-   * Pass each value in the key-value pair RDD through a map function without changing the keys;
-   * this also retains the original RDD's partitioning.
+   * Provide the RDD[(K,V)] equivalent output. 
    */
-  def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
-    val cleanF = index.rdd.context.clean(f)
-    val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = 
-    valuesRDD.mapPartitions(iter => iter.map{ 
-      case (values, bs) => 
-        val newValues = new Array[U](values.size)
-        for ( ind <- bs ) {
-          newValues(ind) = f(values(ind))
-        }
-        (newValues.toIndexedSeq, bs)
-      }, preservesPartitioning = true)
-    new VertexSetRDD[U](index, newValuesRDD)
-  }
+  override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = {
+    tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => 
+      // Walk the index to construct the key, value pairs
+      indexMap.iterator 
+        // Extract rows with key value pairs and indicators
+        .map{ case (k, ind) => (bs(ind), k, ind)  }
+        // Remove tuples that aren't actually present in the array
+        .filter( _._1 )
+        // Extract the pair (removing the indicator from the tuple)
+        .map( x => (x._2, values(x._3) ) )
+    }
+  } // end of compute
 
 
   /**
-   * Pass each value in the key-value pair RDD through a map function without changing the keys;
-   * this also retains the original RDD's partitioning.
+   * @todo finish documenting
    */
-  def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
+  override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
     val cleanF = index.rdd.context.clean(f)
-    val newValues: RDD[ (IndexedSeq[U], BitSet) ] = 
-      index.rdd.zipPartitions(valuesRDD){ 
+    val newValues = index.rdd.zipPartitions(valuesRDD){ 
       (keysIter, valuesIter) => 
       val index = keysIter.next()
       assert(keysIter.hasNext() == false)
       val (oldValues, bs) = valuesIter.next()
       assert(valuesIter.hasNext() == false)
-       // Allocate the array to store the results into
-      val newValues: Array[U] = new Array[U](oldValues.size)
+      // Allocate the array to store the results into
+      val newBS = new BitSet(oldValues.size)
       // Populate the new Values
       for( (k,i) <- index ) {
-        if (bs(i)) { newValues(i) = f(k, oldValues(i)) }      
+        newBS(i) = bs(i) && cleanF( (k, oldValues(i)) ) 
       }
-      Array((newValues.toIndexedSeq, bs)).iterator
+      Array((oldValues, newBS)).iterator
     }
+    new VertexSetRDD[V](index, newValues)
+  } // end of filter
+
+
+  /**
+   * Pass each vertex attribute through a map function and retain 
+   * the original RDD's partitioning and index.
+   * 
+   * @tparam U the type returned by the map function
+   *
+   * @param f the function applied to each value in the RDD
+   * @return a new VertexSet with values obtaind by applying `f` to each of the
+   * entries in the original VertexSet.  The resulting VertexSetRDD retains the
+   * same index.
+   */
+  def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
+    val cleanF = index.rdd.context.clean(f)
+    val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = 
+      valuesRDD.mapPartitions(iter => iter.map{ 
+        case (values, bs) => 
+          /** 
+           * @todo Consider using a view rather than creating a new array. 
+           * This is already being done for join operations.  It could reduce
+           * memory overhead but require additional recomputation.  
+           */
+          val newValues = new Array[U](values.size)
+          for ( ind <- bs ) {
+            newValues(ind) = f(values(ind))
+          }
+          (newValues.toIndexedSeq, bs)
+          }, preservesPartitioning = true)   
+    new VertexSetRDD[U](index, newValuesRDD)
+  } // end of mapValues
+
+
+  /**
+   * Pass each vertex attribute along with the vertex id through a 
+   * map function and retain the original RDD's partitioning and index.
+   * 
+   * @tparam U the type returned by the map function
+   *
+   * @param f the function applied to each vertex id and vertex 
+   * attribute in the RDD
+   * @return a new VertexSet with values obtaind by applying `f` to each of the
+   * entries in the original VertexSet.  The resulting VertexSetRDD retains the
+   * same index.
+   */
+  def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
+    val cleanF = index.rdd.context.clean(f)
+    val newValues: RDD[ (IndexedSeq[U], BitSet) ] = 
+      index.rdd.zipPartitions(valuesRDD){ 
+        (keysIter, valuesIter) => 
+        val index = keysIter.next()
+        assert(keysIter.hasNext() == false)
+        val (oldValues, bs) = valuesIter.next()
+        assert(valuesIter.hasNext() == false)
+        /** 
+         * @todo Consider using a view rather than creating a new array. 
+         * This is already being done for join operations.  It could reduce
+         * memory overhead but require additional recomputation.  
+         */
+        // Allocate the array to store the results into
+        val newValues: Array[U] = new Array[U](oldValues.size)
+        // Populate the new Values
+        for( (k,i) <- index ) {
+          if (bs(i)) { newValues(i) = f(k, oldValues(i)) }      
+        }
+        Array((newValues.toIndexedSeq, bs)).iterator
+      }
     new VertexSetRDD[U](index, newValues)
-  }
+  } // end of mapValuesWithKeys
 
 
+  /**
+   * Inner join this VertexSet with another VertexSet which has the same Index.  
+   * This function will fail if both VertexSets do not share the same index.  
+   * The resulting vertex set will only contain vertices that are in both this
+   * and the other vertex set.   
+   *
+   * @tparam W the attribute type of the other VertexSet
+   * 
+   * @param other the other VertexSet with which to join.  
+   * @return a VertexSetRDD containing only the vertices in both this and the
+   * other VertexSet and with tuple attributes.
+   *
+   */
   def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = {
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
     }
-    val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = valuesRDD.zipPartitions(other.valuesRDD){
-      (thisIter, otherIter) => 
-      val (thisValues, thisBS) = thisIter.next()
-      assert(!thisIter.hasNext)
-      val (otherValues, otherBS) = otherIter.next()
-      assert(!otherIter.hasNext)
-      val newBS = thisBS & otherBS
-      val newValues = thisValues.view.zip(otherValues)
-      Iterator((newValues.toIndexedSeq, newBS))
-    }
+    val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = 
+      valuesRDD.zipPartitions(other.valuesRDD){
+        (thisIter, otherIter) => 
+        val (thisValues, thisBS) = thisIter.next()
+        assert(!thisIter.hasNext)
+        val (otherValues, otherBS) = otherIter.next()
+        assert(!otherIter.hasNext)
+        val newBS = thisBS & otherBS
+        val newValues = thisValues.view.zip(otherValues)
+        Iterator((newValues.toIndexedSeq, newBS))
+      }
     new VertexSetRDD(index, newValuesRDD)
   }
 
 
+  /**
+   * Left join this VertexSet with another VertexSet which has the same Index.  
+   * This function will fail if both VertexSets do not share the same index.  
+   * The resulting vertex set contains an entry for each vertex in this set.
+   * If the other VertexSet is missing any vertex in this VertexSet then a
+   * `None` attribute is generated  
+   *
+   * @tparam W the attribute type of the other VertexSet
+   * 
+   * @param other the other VertexSet with which to join.  
+   * @return a VertexSetRDD containing all the vertices in this VertexSet
+   * with `None` attributes used for Vertices missing in the other VertexSet.
+   *
+   */
   def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = {
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
@@ -195,41 +328,63 @@ class VertexSetRDD[V: ClassManifest](
       Iterator((newValues.toIndexedSeq, thisBS))
     }
     new VertexSetRDD(index, newValuesRDD)
-  }
-
+  } // end of leftZipJoin
 
 
+  /**
+   * Left join this VertexSet with an RDD containing vertex attribute pairs.  
+   * If the other RDD is backed by a VertexSet with the same index than the 
+   * efficient leftZipJoin implementation is used.
+   * The resulting vertex set contains an entry for each vertex in this set.
+   * If the other VertexSet is missing any vertex in this VertexSet then a
+   * `None` attribute is generated  
+   *
+   * @tparam W the attribute type of the other VertexSet
+   * 
+   * @param other the other VertexSet with which to join.
+   * @param merge the function used combine duplicate vertex attributes   
+   * @return a VertexSetRDD containing all the vertices in this VertexSet
+   * with `None` attributes used for Vertices missing in the other VertexSet.
+   *
+   */
   def leftJoin[W: ClassManifest](
     other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a):
     VertexSetRDD[(V, Option[W]) ] = {
     val cleanMerge = index.rdd.context.clean(merge)
-
+    // Test if the other vertex is a VertexSetRDD to choose the optimal
+    // join strategy
     other match {
+      // If the other set is a VertexSetRDD and shares the same index as
+      // this vertex set then we use the much more efficient leftZipJoin
       case other: VertexSetRDD[_] if index == other.index => {
         leftZipJoin(other)
       }    
       case _ => {
-        // Get the partitioner from the index
-        val partitioner = index.rdd.partitioner match {
-          case Some(p) => p
-          case None => throw new SparkException("An index must have a partitioner.")
-        }
-        // Shuffle the other RDD using the partitioner for this index
+        // Otherwise we treat the other RDD as a collectiong of 
+        // vertex-attribute pairs.  
+        // If necessary shuffle the other RDD using the partitioner 
+        // for this VertexSet
         val otherShuffled = 
-          if (other.partitioner == Some(partitioner)) other 
-          else other.partitionBy(partitioner)
+          if (other.partitioner == partitioner) other 
+          else other.partitionBy(partitioner.get)
+        // Compute the new values RDD
         val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = 
-          index.rdd.zipPartitions(valuesRDD, other) {
+          index.rdd.zipPartitions(valuesRDD, otherShuffled) {
           (thisIndexIter, thisIter, tuplesIter) =>
+          // Get the Index and values for this RDD
           val index = thisIndexIter.next()
           assert(!thisIndexIter.hasNext)
           val (thisValues, thisBS) = thisIter.next()
           assert(!thisIter.hasNext)
+          // Create a new array to store the values in the resulting VertexSet
           val newW = new Array[W](thisValues.size)
           // track which values are matched with values in other
           val wBS = new BitSet(thisValues.size)
+          // Loop over all the tuples that have vertices in this VertexSet.  
           for( (k, w) <- tuplesIter if index.contains(k) ) {
             val ind = index.get(k)
+            // Not all the vertex ids in the index are in this VertexSet. 
+            // If there is a vertex in this set then record the other value
             if(thisBS(ind)) {
               if(wBS(ind)) {
                 newW(ind) = cleanMerge(newW(ind), w) 
@@ -238,31 +393,33 @@ class VertexSetRDD[V: ClassManifest](
                 wBS(ind) = true
               }
             }
-          }
-
+          } // end of for loop over tuples
+          // Some vertices in this vertex set may not have a corresponding
+          // tuple in the join and so a None value should be returned. 
           val otherOption = newW.view.zipWithIndex
             .map{ (x: (W, Int)) => if(wBS(x._2)) Option(x._1) else None }
+          // the final values is the zip of the values in this RDD along with
+          // the values in the other
           val newValues = thisValues.view.zip(otherOption)
-
           Iterator((newValues.toIndexedSeq, thisBS))
         } // end of newValues
         new VertexSetRDD(index, newValues) 
       }
     }
-  }
+  } // end of leftJoin
 
 
   /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
+   * For each key k in `this` or `other`, return a resulting RDD that contains a 
+   * tuple with the list of values for that key in `this` as well as `other`.
    */
   def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): 
   VertexSetRDD[(Seq[V], Seq[W])] = {
     //RDD[(K, (Seq[V], Seq[W]))] = {
     other match {
       case other: VertexSetRDD[_] if index == other.index => {
-        // if both RDDs share exactly the same index and therefore the same super set of keys
-        // then we simply merge the value RDDs. 
+        // if both RDDs share exactly the same index and therefore the same 
+        // super set of keys then we simply merge the value RDDs. 
         // However it is possible that both RDDs are missing a value for a given key in 
         // which case the returned RDD should have a null value
         val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] = 
@@ -272,10 +429,12 @@ class VertexSetRDD[V: ClassManifest](
             assert(!thisIter.hasNext)
             val (otherValues, otherBS) = otherIter.next()
             assert(!otherIter.hasNext)
-
+            /** 
+             * @todo consider implementing this with a view as in leftJoin to 
+             * reduce array allocations
+             */
             val newValues = new Array[(Seq[V], Seq[W])](thisValues.size)
             val newBS = thisBS | otherBS
-
             for( ind <- newBS ) {
               val a = if (thisBS(ind)) Seq(thisValues(ind)) else Seq.empty[V]
               val b = if (otherBS(ind)) Seq(otherValues(ind)) else Seq.empty[W]
@@ -400,12 +559,6 @@ class VertexSetRDD[V: ClassManifest](
                 newValues.append( (Seq.empty[V], ArrayBuffer(w) ) ) 
               }
             }
-            // // Finalize the new values array
-            // val newValuesArray: Seq[Seq[(Seq[V],Seq[W])]] = 
-            //   newValues.view.map{ 
-            //     case null => null
-            //     case (s, ab) => Seq((s, ab.toSeq)) 
-            //     }.toSeq 
             Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) )
           }).cache()
 
@@ -417,228 +570,37 @@ class VertexSetRDD[V: ClassManifest](
         new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues)
       }
     }
-  }
-
-
-  // 
-  // def zipJoinToRDD[W: ClassManifest](other: VertexSetRDD[K,W]): RDD[(K,(V,W))] = {
-  //   if(index != other.index) {
-  //     throw new SparkException("ZipJoinRDD can only be applied to RDDs with the same index!")
-  //   }
-  //   index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
-  //     (thisIndexIter, thisIter, otherIter) => 
-  //     val index = thisIndexIter.next()
-  //     assert(!thisIndexIter.hasNext)
-  //     val (thisValues, thisBS) = thisIter.next()
-  //     assert(!thisIter.hasNext)
-  //     val (otherValues, otherBS) = otherIter.next()
-  //     assert(!otherIter.hasNext)
-  //     val newBS = thisBS & otherBS
-  //     index.iterator.filter{ case (k,i) => newBS(i) }.map{ 
-  //       case (k,i) => (k, (thisValues(i), otherValues(i)))
-  //     }
-  //   }
-  // }
-
-
-/*  This is probably useful but we are not using it
-  def zipJoinWithKeys[W: ClassManifest, Z: ClassManifest](
-    other: RDD[(K,W)])(
-    f: (K, V, W) => Z, 
-    merge: (Z,Z) => Z = (a:Z, b:Z) => a):
-    VertexSetRDD[K,Z] = {
-    val cleanF = index.rdd.context.clean(f)
-    val cleanMerge = index.rdd.context.clean(merge)
-    other match {
-      case other: VertexSetRDD[_, _] if index == other.index => {
-        val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
-          (thisIndexIter, thisIter, otherIter) => 
-          val index = thisIndexIter.next()
-          assert(!thisIndexIter.hasNext)
-          val (thisValues, thisBS) = thisIter.next()
-          assert(!thisIter.hasNext)
-          val (otherValues, otherBS) = otherIter.next()
-          assert(!otherIter.hasNext)
-          val newValues = new Array[Z](thisValues.size)
-          val newBS = thisBS & otherBS
-          for( (k,i) <- index ) {
-            if (newBS(i)) { 
-              newValues(i) = cleanF(k, thisValues(i), otherValues(i))
-            }       
-          }
-          List((newValues, newBS)).iterator
-        }
-        new VertexSetRDD(index, newValues) 
-      }
-    
-      case _ => {
-        // Get the partitioner from the index
-        val partitioner = index.rdd.partitioner match {
-          case Some(p) => p
-          case None => throw new SparkException("An index must have a partitioner.")
-        }
-        // Shuffle the other RDD using the partitioner for this index
-        val otherShuffled = 
-          if (other.partitioner == Some(partitioner)) other 
-          else other.partitionBy(partitioner)
-
-        val newValues = index.rdd.zipPartitions(valuesRDD, other) {
-          (thisIndexIter, thisIter, tuplesIter) =>
-          val index = thisIndexIter.next()
-          assert(!thisIndexIter.hasNext)
-          val (thisValues, thisBS) = thisIter.next()
-          assert(!thisIter.hasNext)
-
-          val newValues = new Array[Z](thisValues.size)
-          // track which values are matched with values in other
-          val tempBS = new BitSet(thisValues.size)
-
-          for( (k, w) <- tuplesIter if index.contains(k) ) {
-            val ind = index.get(k)
-            if(thisBS(ind)) {
-              val result = cleanF(k, thisValues(ind), w)
-              if(tempBS(ind)) {
-                newValues(ind) = cleanMerge(newValues(ind), result) 
-              } else {
-                newValues(ind) = result
-                tempBS(ind) = true
-              }
-            }
-          } 
-          List((newValues, tempBS)).iterator
-        } // end of newValues
-        new VertexSetRDD(index, newValues) 
-      }
-    }
-  }
-*/
-
-/*
-  def zipJoinLeftWithKeys[W: ClassManifest, Z: ClassManifest](
-    other: RDD[(K,W)])(
-    f: (K, V, Option[W]) => Z, 
-    merge: (Z,Z) => Z = (a:Z, b:Z) => a):
-    VertexSetRDD[K,Z] = {
-    val cleanF = index.rdd.context.clean(f)
-    val cleanMerge = index.rdd.context.clean(merge)
-    other match {
-      case other: VertexSetRDD[_, _] if index == other.index => {
-        val newValues = index.rdd.zipPartitions(valuesRDD, other.valuesRDD){
-          (thisIndexIter, thisIter, otherIter) => 
-          val index = thisIndexIter.next()
-          assert(!thisIndexIter.hasNext)
-          val (thisValues, thisBS) = thisIter.next()
-          assert(!thisIter.hasNext)
-          val (otherValues, otherBS) = otherIter.next()
-          assert(!otherIter.hasNext)
-          val newValues = new Array[Z](thisValues.size)
-          for( (k,i) <- index ) {
-            if (thisBS(i)) { 
-              val otherVal = if(otherBS(i)) Some(otherValues(i)) else None
-              newValues(i) = cleanF(k, thisValues(i), otherVal)
-            }       
-          }
-          List((newValues, thisBS)).iterator
-        }
-        new VertexSetRDD(index, newValues) 
-      }
-    
-      case _ => {
-        // Get the partitioner from the index
-        val partitioner = index.rdd.partitioner match {
-          case Some(p) => p
-          case None => throw new SparkException("An index must have a partitioner.")
-        }
-        // Shuffle the other RDD using the partitioner for this index
-        val otherShuffled = 
-          if (other.partitioner == Some(partitioner)) other 
-          else other.partitionBy(partitioner)
-        val newValues = index.rdd.zipPartitions(valuesRDD, other) {
-          (thisIndexIter, thisIter, tuplesIter) =>
-          val index = thisIndexIter.next()
-          assert(!thisIndexIter.hasNext)
-          val (thisValues, thisBS) = thisIter.next()
-          assert(!thisIter.hasNext)
-
-          val newValues = new Array[Z](thisValues.size)
-          // track which values are matched with values in other
-          val tempBS = new BitSet(thisValues.size)
-
-          for( (k, w) <- tuplesIter if index.contains(k) ) {
-            val ind = index.get(k)
-            if(thisBS(ind)) {
-              val result = cleanF(k, thisValues(ind), Option(w))
-              if(tempBS(ind)) {
-                newValues(ind) = cleanMerge(newValues(ind), result) 
-              } else {
-                newValues(ind) = result
-                tempBS(ind) = true
-              }
-            }
-          } 
-
-          // Process remaining keys in lef join
-          for( (k,ind) <- index if thisBS(ind) && !tempBS(ind)) {
-            newValues(ind) = cleanF(k, thisValues(ind), None)
-          }
-          List((newValues, thisBS)).iterator
-        } // end of newValues
-        new VertexSetRDD(index, newValues) 
-      }
-    }
-  }
-
-*/
-
-
-
-  override def filter(f: Tuple2[Vid,V] => Boolean): VertexSetRDD[V] = {
-    val cleanF = index.rdd.context.clean(f)
-    val newValues = index.rdd.zipPartitions(valuesRDD){ 
-      (keysIter, valuesIter) => 
-      val index = keysIter.next()
-      assert(keysIter.hasNext() == false)
-      val (oldValues, bs) = valuesIter.next()
-      assert(valuesIter.hasNext() == false)
-      // Allocate the array to store the results into
-      val newBS = new BitSet(oldValues.size)
-      // Populate the new Values
-      for( (k,i) <- index ) {
-        newBS(i) = bs(i) && cleanF( (k, oldValues(i)) ) 
-      }
-      Array((oldValues, newBS)).iterator
-    }
-    new VertexSetRDD[V](index, newValues)
-  }
-
-
-  /**
-   * Provide the RDD[(K,V)] equivalent output. 
-   */
-  override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = {
-    tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => 
-      // Walk the index to construct the key, value pairs
-      indexMap.iterator 
-        // Extract rows with key value pairs and indicators
-        .map{ case (k, ind) => (bs(ind), k, ind)  }
-        // Remove tuples that aren't actually present in the array
-        .filter( _._1 )
-        // Extract the pair (removing the indicator from the tuple)
-        .map( x => (x._2, values(x._3) ) )
-    }
-  }
-
+  } // end of cogroup
 } // End of VertexSetRDD
 
 
 
-
+/**
+ * The VertexSetRDD singleton is used to construct VertexSets
+ */
 object VertexSetRDD {
 
-
+  /**
+   * Construct a vertex set from an RDD of vertex-attribute pairs.  
+   * Duplicate entries are removed arbitrarily.  
+   *
+   * @tparam V the vertex attribute type
+   *
+   * @param rdd the collection of vertex-attribute pairs
+   */
   def apply[V: ClassManifest](rdd: RDD[(Vid,V)]): VertexSetRDD[V] = 
     apply(rdd, (a:V, b:V) => a )
 
+  /**
+   * Construct a vertex set from an RDD of vertex-attribute pairs where
+   * duplicate entries are merged using the reduceFunc
+   *
+   * @tparam V the vertex attribute type
+   * 
+   * @param rdd the collection of vertex-attribute pairs
+   * @param reduceFunc the function used to merge attributes of duplicate
+   * vertices.
+   */
   def apply[V: ClassManifest](
     rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = {
     // Preaggregate and shuffle if necessary
@@ -669,61 +631,29 @@ object VertexSetRDD {
     val values: RDD[(IndexedSeq[V], BitSet)] = 
       groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
     new VertexSetRDD[V](new VertexSetIndex(index), values)
-  }
-
+  } // end of apply
 
 
+  /**
+   * @todo finish documenting
+   */
   def apply[V: ClassManifest](
     rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] = 
     apply(rdd, index, (a:V,b:V) => a)
 
 
+  /**
+   * @todo finish documenting
+   */
   def apply[V: ClassManifest](
     rdd: RDD[(Vid,V)], index: VertexSetIndex,
     reduceFunc: (V, V) => V): VertexSetRDD[V] = 
     apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
-  // {
-  //   // Get the index Partitioner
-  //   val partitioner = index.rdd.partitioner match {
-  //     case Some(p) => p
-  //     case None => throw new SparkException("An index must have a partitioner.")
-  //   }
-  //   // Preaggregate and shuffle if necessary
-  //   val partitioned = 
-  //     if (rdd.partitioner != Some(partitioner)) {
-  //       // Preaggregation.
-  //       val aggregator = new Aggregator[K, V, V](v => v, reduceFunc, reduceFunc)
-  //       rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
-  //     } else {
-  //       rdd
-  //     }
-
-  //   // Use the index to build the new values table
-  //   val values = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
-  //     // There is only one map
-  //     val index = indexIter.next()
-  //     assert(!indexIter.hasNext())
-  //     val values = new Array[V](index.size)
-  //     val bs = new BitSet(index.size)
-  //     for ((k,v) <- tblIter) {
-  //       if (!index.contains(k)) {
-  //         throw new SparkException("Error: Trying to bind an external index " +
-  //           "to an RDD which contains keys that are not in the index.")
-  //       }
-  //       val ind = index(k)
-  //       if (bs(ind)) { 
-  //         values(ind) = reduceFunc(values(ind), v) 
-  //       } else {
-  //         values(ind) = v
-  //         bs(ind) = true
-  //       }
-  //     }
-  //     List((values, bs)).iterator
-  //   })
-  //   new VertexSetRDD[K,V](index, values)
-  // } // end of apply
-
+  
 
+  /**
+   * @todo finish documenting
+   */
   def apply[V: ClassManifest, C: ClassManifest](
     rdd: RDD[(Vid,V)], 
     index: VertexSetIndex,
@@ -774,6 +704,8 @@ object VertexSetRDD {
 
   /**
    * Construct and index of the unique values in a given RDD.
+   *
+   * @todo finish documenting
    */
   def makeIndex(keys: RDD[Vid], 
     partitioner: Option[Partitioner] = None): VertexSetIndex = {
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index 2d74ce92e25efc15ca7a37d95b6c27d9d857ebdf..f2b3d5bdfe26882025b6f3d98b7ecbf19dccf9e4 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -72,40 +72,22 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     }
   }
 
-//  test("graph partitioner") {
-//    sc = new SparkContext("local", "test")
-//    val vertices = sc.parallelize(Seq(Vertex(1, "one"), Vertex(2, "two")))
-//    val edges = sc.parallelize(Seq(Edge(1, 2, "onlyedge")))
-//    var g = Graph(vertices, edges)
-//
-//    g = g.withPartitioner(4, 7)
-//    assert(g.numVertexPartitions === 4)
-//    assert(g.numEdgePartitions === 7)
-//
-//    g = g.withVertexPartitioner(5)
-//    assert(g.numVertexPartitions === 5)
-//
-//    g = g.withEdgePartitioner(8)
-//    assert(g.numEdgePartitions === 8)
-//
-//    g = g.mapVertices(x => x)
-//    assert(g.numVertexPartitions === 5)
-//    assert(g.numEdgePartitions === 8)
-//
-//    g = g.mapEdges(x => x)
-//    assert(g.numVertexPartitions === 5)
-//    assert(g.numEdgePartitions === 8)
-//
-//    val updates = sc.parallelize(Seq((1, " more")))
-//    g = g.updateVertices(
-//      updates,
-//      (v, u: Option[String]) => if (u.isDefined) v.data + u.get else v.data)
-//    assert(g.numVertexPartitions === 5)
-//    assert(g.numEdgePartitions === 8)
-//
-//    g = g.reverse
-//    assert(g.numVertexPartitions === 5)
-//    assert(g.numEdgePartitions === 8)
-//
-//  }
+
+  test("VertexSetRDD") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5)
+      val b = VertexSetRDD(a).mapValues(x => -x)
+      assert(b.leftJoin(a)
+        .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
+      val c = VertexSetRDD(a, b.index)
+      assert(b.leftJoin(c)
+        .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
+      val d = c.filter(q => ((q._2 % 2) == 0))
+      val e = a.filter(q => ((q._2 % 2) == 0))
+      assert(d.count === e.count)
+      assert(b.zipJoin(c).mapValues(x => x._1 + x._2)
+        .map(x => x._2).reduce(_+_) === 0)
+    }
+  } 
+  
 }