diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
index a376d1015a314815ac3cd75e76b1629d544ed62c..e53551ced62ec7ada495a94486a3066643346e53 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
@@ -27,14 +27,21 @@ package org.apache.spark.util.hash
  */
 private[spark]
 class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest](
-    initialCapacity: Int)
+  val keySet: OpenHashSet[K], var _values: Array[V])
   extends Iterable[(K, V)]
   with Serializable {
 
-  def this() = this(64)
+  /**
+   * Allocate an OpenHashMap with a fixed initial capacity
+   */
+  def this(initialCapacity: Int = 64) = 
+    this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
+
+  /**
+   * Allocate an OpenHashMap with a fixed initial capacity
+   */
+  def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
 
-  protected var _keySet = new OpenHashSet[K](initialCapacity)
-  private var _values = new Array[V](_keySet.capacity)
 
   @transient private var _oldValues: Array[V] = null
 
@@ -42,14 +49,14 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
   private var haveNullValue = false
   private var nullValue: V = null.asInstanceOf[V]
 
-  override def size: Int = if (haveNullValue) _keySet.size + 1 else _keySet.size
+  override def size: Int = if (haveNullValue) keySet.size + 1 else keySet.size
 
   /** Get the value for a given key */
   def apply(k: K): V = {
     if (k == null) {
       nullValue
     } else {
-      val pos = _keySet.getPos(k)
+      val pos = keySet.getPos(k)
       if (pos < 0) {
         null.asInstanceOf[V]
       } else {
@@ -64,9 +71,26 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
       haveNullValue = true
       nullValue = v
     } else {
-      val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+      val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
       _values(pos) = v
-      _keySet.rehashIfNeeded(k, grow, move)
+      keySet.rehashIfNeeded(k, grow, move)
+      _oldValues = null
+    }
+  }
+
+  /** Set the value for a key */
+  def setMerge(k: K, v: V, mergeF: (V,V) => V) {
+    if (k == null) {
+      if(haveNullValue) {
+        nullValue = mergeF(nullValue, v)
+      } else {
+        haveNullValue = true
+        nullValue = v
+      }
+    } else {
+      val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+      _values(pos) = mergeF(_values(pos), v)
+      keySet.rehashIfNeeded(k, grow, move)
       _oldValues = null
     }
   }
@@ -87,11 +111,11 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
       }
       nullValue
     } else {
-      val pos = _keySet.fastAdd(k)
+      val pos = keySet.fastAdd(k)
       if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
         val newValue = defaultValue
         _values(pos & OpenHashSet.POSITION_MASK) = newValue
-        _keySet.rehashIfNeeded(k, grow, move)
+        keySet.rehashIfNeeded(k, grow, move)
         newValue
       } else {
         _values(pos) = mergeValue(_values(pos))
@@ -113,9 +137,9 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
         }
         pos += 1
       }
-      pos = _keySet.nextPos(pos)
+      pos = keySet.nextPos(pos)
       if (pos >= 0) {
-        val ret = (_keySet.getValue(pos), _values(pos))
+        val ret = (keySet.getValue(pos), _values(pos))
         pos += 1
         ret
       } else {
@@ -146,3 +170,4 @@ class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V:
     _values(newPos) = _oldValues(oldPos)
   }
 }
+
diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
index 7aa3f6220cee38642eb688875b564fb7132c1529..d083ab26ac86850508900af23749b2a95380783e 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
@@ -81,6 +81,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
   protected var _data = classManifest[T].newArray(_capacity)
   protected var _bitset = new BitSet(_capacity)
 
+  def getBitSet = _bitset
+
   /** Number of elements in the set. */
   def size: Int = _size
 
@@ -147,6 +149,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
   /** Return the value at the specified position. */
   def getValue(pos: Int): T = _data(pos)
 
+  /** Return the value at the specified position. */
+  def getValueSafe(pos: Int): T = {
+    assert(_bitset.get(pos))
+    _data(pos)
+  }
+
+
   /**
    * Return the next position with an element stored, starting from the given position inclusively.
    */
diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
index 14c136720788aee3c64507f82cedae87de2a2c11..08fc74e5da6ba72b0baa36ccfa80e91e1b0fa35e 100644
--- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
@@ -28,35 +28,56 @@ package org.apache.spark.util.hash
 private[spark]
 class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
                               @specialized(Long, Int, Double) V: ClassManifest](
-    initialCapacity: Int)
+    val keySet: OpenHashSet[K], var _values: Array[V])
   extends Iterable[(K, V)]
   with Serializable {
 
-  def this() = this(64)
+  /**
+   * Allocate an OpenHashMap with a fixed initial capacity
+   */
+  def this(initialCapacity: Int = 64) = 
+    this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
 
-  require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
+  /**
+   * Allocate an OpenHashMap with a fixed initial capacity
+   */
+  def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
 
-  protected var _keySet = new OpenHashSet[K](initialCapacity)
-  private var _values = new Array[V](_keySet.capacity)
+  require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
 
   private var _oldValues: Array[V] = null
 
-  override def size = _keySet.size
+  override def size = keySet.size
 
   /** Get the value for a given key */
   def apply(k: K): V = {
-    val pos = _keySet.getPos(k)
+    val pos = keySet.getPos(k)
     _values(pos)
   }
 
   /** Set the value for a key */
   def update(k: K, v: V) {
-    val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
+    val pos = keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
     _values(pos) = v
-    _keySet.rehashIfNeeded(k, grow, move)
+    keySet.rehashIfNeeded(k, grow, move)
+    _oldValues = null
+  }
+
+
+  /** Set the value for a key */
+  def setMerge(k: K, v: V, mergeF: (V,V) => V) {
+    val pos = keySet.fastAdd(k)
+    val ind = pos & OpenHashSet.POSITION_MASK
+    if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) { // if first add
+      _values(ind) = v
+    } else {
+      _values(ind) = mergeF(_values(ind), v)
+    }
+    keySet.rehashIfNeeded(k, grow, move)
     _oldValues = null
   }
 
+
   /**
    * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
    * set its value to mergeValue(oldValue).
@@ -64,11 +85,11 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
    * @return the newly updated value.
    */
   def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
-    val pos = _keySet.fastAdd(k)
+    val pos = keySet.fastAdd(k)
     if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
       val newValue = defaultValue
       _values(pos & OpenHashSet.POSITION_MASK) = newValue
-      _keySet.rehashIfNeeded(k, grow, move)
+      keySet.rehashIfNeeded(k, grow, move)
       newValue
     } else {
       _values(pos) = mergeValue(_values(pos))
@@ -82,9 +103,9 @@ class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
 
     /** Get the next value we should return from next(), or null if we're finished iterating */
     def computeNextPair(): (K, V) = {
-      pos = _keySet.nextPos(pos)
+      pos = keySet.nextPos(pos)
       if (pos >= 0) {
-        val ret = (_keySet.getValue(pos), _values(pos))
+        val ret = (keySet.getValue(pos), _values(pos))
         pos += 1
         ret
       } else {
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
index 821063e1f811bd4f86a0a636381cbf6048f39e28..62f445127c5681eb8b76e6882e966f7e38409bde 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -1,14 +1,11 @@
 package org.apache.spark.graph
 
-import org.apache.spark.util.hash.BitSet
-
-
 import com.esotericsoftware.kryo.Kryo
 
 import org.apache.spark.graph.impl.MessageToPartition
 import org.apache.spark.serializer.KryoRegistrator
 import org.apache.spark.graph.impl._
-import scala.collection.mutable.BitSet
+import org.apache.spark.util.hash.BitSet
 
 class GraphKryoRegistrator extends KryoRegistrator {
 
@@ -20,7 +17,6 @@ class GraphKryoRegistrator extends KryoRegistrator {
     kryo.register(classOf[EdgePartition[Object]])
     kryo.register(classOf[BitSet])
     kryo.register(classOf[VertexIdToIndexMap])
-
     // This avoids a large number of hash table lookups.
     kryo.setReferences(false)
   }
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
index 8acc89a95b8f1f0daca39042a03ef07638f50a5e..f26e28600303314e129bc498cbd1c2721db3a3bf 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
@@ -31,6 +31,8 @@ import org.apache.spark.Partitioner._
 
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.hash.BitSet
+import org.apache.spark.util.hash.OpenHashSet
+import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
 
 
 
@@ -60,7 +62,6 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
 } // end of VertexSetIndex
 
 
-
 /**
  * An VertexSetRDD[V] extends the RDD[(Vid,V)] by ensuring that there
  * is only one entry for each vertex and by pre-indexing the entries
@@ -97,7 +98,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
  */
 class VertexSetRDD[@specialized V: ClassManifest](
     @transient val index:  VertexSetIndex,
-    @transient val valuesRDD: RDD[ (IndexedSeq[V], BitSet) ])
+    @transient val valuesRDD: RDD[ ( (Int => V), BitSet) ])
   extends RDD[(Vid, V)](index.rdd.context, 
     List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
 
@@ -160,15 +161,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * Provide the RDD[(K,V)] equivalent output. 
    */
   override def compute(part: Partition, context: TaskContext): Iterator[(Vid, V)] = {
-    tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) => 
-      // Walk the index to construct the key, value pairs
-      indexMap.iterator 
-        // Extract rows with key value pairs and indicators
-        .map{ case (k, ind) => (bs.get(ind), k, ind)  }
-        // Remove tuples that aren't actually present in the array
-        .filter( _._1 )
-        // Extract the pair (removing the indicator from the tuple)
-        .map( x => (x._2, values(x._3) ) )
+    tuples.compute(part, context).flatMap { case (indexMap, (values, bs) ) =>
+      bs.iterator.map(ind => (indexMap.getValueSafe(ind), values(ind)))
     }
   } // end of compute
 
@@ -188,18 +182,22 @@ class VertexSetRDD[@specialized V: ClassManifest](
     val cleanPred = index.rdd.context.clean(pred)
     val newValues = index.rdd.zipPartitions(valuesRDD){ 
       (keysIter: Iterator[VertexIdToIndexMap], 
-       valuesIter: Iterator[(IndexedSeq[V], BitSet)]) => 
+       valuesIter: Iterator[(Int => V, BitSet)]) => 
       val index = keysIter.next()
       assert(keysIter.hasNext() == false)
       val (oldValues, bs) = valuesIter.next()
       assert(valuesIter.hasNext() == false)
       // Allocate the array to store the results into
-      val newBS = new BitSet(oldValues.size)
-      // Populate the new Values
-      for( (k,i) <- index ) {
-        if( bs.get(i) && cleanPred( (k, oldValues(i)) ) ) {
-          newBS.set(i)
+      val newBS = new BitSet(index.capacity)
+      // Iterate over the active bits in the old bitset and 
+      // evaluate the predicate
+      var ind = bs.nextSetBit(0)
+      while(ind >= 0) {
+        val k = index.getValueSafe(ind)
+        if( cleanPred( (k, oldValues(ind)) ) ) {
+          newBS.set(ind)
         }
+        ind = bs.nextSetBit(ind+1)
       }
       Array((oldValues, newBS)).iterator
     }
@@ -219,32 +217,13 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * VertexSetRDD retains the same index.
    */
   def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
-    val cleanF = index.rdd.context.clean(f)
-    val newValuesRDD: RDD[ (IndexedSeq[U], BitSet) ] = 
+    val newValuesRDD: RDD[ (Int => U, BitSet) ] = 
       valuesRDD.mapPartitions(iter => iter.map{ 
         case (values, bs: BitSet) => 
-
-          /** 
-           * @todo Consider using a view rather than creating a new
-           * array.  This is already being done for join operations.
-           * It could reduce memory overhead but require additional
-           * recomputation.
-           */
-          val newValues = new Array[U](values.size)
-          var ind = bs.nextSetBit(0)
-          while(ind >= 0) {
-            // if(ind >= newValues.size) {
-            //   println(ind)
-            //   println(newValues.size)
-            //   bs.iterator.foreach(print(_))
-            // }
-            // assert(ind < newValues.size)
-            // assert(ind < values.size)
-            newValues(ind) = cleanF(values(ind))
-            ind = bs.nextSetBit(ind+1)
-          }
-          (newValues.toIndexedSeq, bs)
-          }, preservesPartitioning = true)   
+          val newValues: (Int => U) = 
+            (ind: Int) => if (bs.get(ind)) f(values(ind)) else null.asInstanceOf[U]
+          (newValues, bs)
+      }, preservesPartitioning = true)   
     new VertexSetRDD[U](index, newValuesRDD)
   } // end of mapValues
 
@@ -262,27 +241,20 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * VertexSetRDD retains the same index.
    */
   def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
-    val cleanF = index.rdd.context.clean(f)
-    val newValues: RDD[ (IndexedSeq[U], BitSet) ] = 
+    val newValues: RDD[ (Int => U, BitSet) ] = 
       index.rdd.zipPartitions(valuesRDD){ 
         (keysIter: Iterator[VertexIdToIndexMap], 
-         valuesIter: Iterator[(IndexedSeq[V], BitSet)]) => 
+         valuesIter: Iterator[(Int => V, BitSet)]) => 
         val index = keysIter.next()
         assert(keysIter.hasNext() == false)
         val (oldValues, bs: BitSet) = valuesIter.next()
         assert(valuesIter.hasNext() == false)
-        /** 
-         * @todo Consider using a view rather than creating a new array. 
-         * This is already being done for join operations.  It could reduce
-         * memory overhead but require additional recomputation.  
-         */
-        // Allocate the array to store the results into
-        val newValues: Array[U] = new Array[U](oldValues.size)
-        // Populate the new Values
-        for( (k,i) <- index ) {
-          if (bs.get(i)) { newValues(i) = cleanF(k, oldValues(i)) }      
+        // Cosntruct a view of the map transformation
+        val newValues: (Int => U) = (ind: Int) => {
+          if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) }
+          else { null.asInstanceOf[U] }
         }
-        Array((newValues.toIndexedSeq, bs)).iterator
+        Iterator((newValues, bs))
       }
     new VertexSetRDD[U](index, newValues)
   } // end of mapValuesWithKeys
@@ -305,17 +277,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
     }
-    val newValuesRDD: RDD[ (IndexedSeq[(V,W)], BitSet) ] = 
+    val newValuesRDD: RDD[ (Int => (V,W), BitSet) ] = 
       valuesRDD.zipPartitions(other.valuesRDD){
-        (thisIter: Iterator[(IndexedSeq[V], BitSet)], 
-          otherIter: Iterator[(IndexedSeq[W], BitSet)]) => 
+        (thisIter: Iterator[(Int => V, BitSet)], 
+          otherIter: Iterator[(Int => W, BitSet)]) => 
         val (thisValues, thisBS: BitSet) = thisIter.next()
         assert(!thisIter.hasNext)
         val (otherValues, otherBS: BitSet) = otherIter.next()
         assert(!otherIter.hasNext)
         val newBS: BitSet = thisBS & otherBS
-        val newValues = thisValues.view.zip(otherValues)
-        Iterator((newValues.toIndexedSeq, newBS))
+        val newValues: Int => (V,W) = (ind: Int) => (thisValues(ind), otherValues(ind)) 
+        Iterator((newValues, newBS))
       }
     new VertexSetRDD(index, newValuesRDD)
   }
@@ -340,18 +312,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
     }
-    val newValuesRDD: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = 
+    val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = 
       valuesRDD.zipPartitions(other.valuesRDD){
-      (thisIter: Iterator[(IndexedSeq[V], BitSet)], 
-        otherIter: Iterator[(IndexedSeq[W], BitSet)]) => 
+      (thisIter: Iterator[(Int => V, BitSet)], 
+        otherIter: Iterator[(Int => W, BitSet)]) => 
       val (thisValues, thisBS: BitSet) = thisIter.next()
       assert(!thisIter.hasNext)
       val (otherValues, otherBS: BitSet) = otherIter.next()
       assert(!otherIter.hasNext)
-      val otherOption = otherValues.view.zipWithIndex
-        .map{ (x: (W, Int)) => if(otherBS.get(x._2)) Option(x._1) else None }
-      val newValues = thisValues.view.zip(otherOption)
-      Iterator((newValues.toIndexedSeq, thisBS))
+      val newValues: Int => (V, Option[W]) = (ind: Int) => 
+        (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
+      Iterator((newValues, thisBS))
     }
     new VertexSetRDD(index, newValuesRDD)
   } // end of leftZipJoin
@@ -378,7 +349,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
   def leftJoin[W: ClassManifest](
     other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a):
     VertexSetRDD[(V, Option[W]) ] = {
-    val cleanMerge = index.rdd.context.clean(merge)
     // Test if the other vertex is a VertexSetRDD to choose the optimal
     // join strategy
     other match {
@@ -396,10 +366,10 @@ class VertexSetRDD[@specialized V: ClassManifest](
           if (other.partitioner == partitioner) other 
           else other.partitionBy(partitioner.get)
         // Compute the new values RDD
-        val newValues: RDD[ (IndexedSeq[(V,Option[W])], BitSet) ] = 
+        val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = 
           index.rdd.zipPartitions(valuesRDD, otherShuffled) {
           (thisIndexIter: Iterator[VertexIdToIndexMap], 
-            thisIter: Iterator[(IndexedSeq[V], BitSet)], 
+            thisIter: Iterator[(Int => V, BitSet)], 
             tuplesIter: Iterator[(Vid,W)]) =>
           // Get the Index and values for this RDD
           val index = thisIndexIter.next()
@@ -407,33 +377,32 @@ class VertexSetRDD[@specialized V: ClassManifest](
           val (thisValues, thisBS) = thisIter.next()
           assert(!thisIter.hasNext)
           // Create a new array to store the values in the resulting VertexSet
-          val newW = new Array[W](thisValues.size)
+          val otherValues = new Array[W](index.capacity)
           // track which values are matched with values in other
-          val wBS = new BitSet(thisValues.size)
-          // Loop over all the tuples that have vertices in this VertexSet.  
-          for( (k, w) <- tuplesIter if index.contains(k) ) {
-            val ind = index.get(k)
-            // Not all the vertex ids in the index are in this VertexSet. 
-            // If there is a vertex in this set then record the other value
-            if(thisBS.get(ind)) {
-              if(wBS.get(ind)) {
-                newW(ind) = cleanMerge(newW(ind), w) 
-              } else {
-                newW(ind) = w
-                wBS.set(ind) 
+          val otherBS = new BitSet(index.capacity)
+          for ((k,w) <- tuplesIter) {
+            // Get the location of the key in the index
+            val pos = index.getPos(k)
+            // Only if the key is already in the index
+            if ((pos & OpenHashSet.EXISTENCE_MASK) == 0) {
+              // Get the actual index
+              val ind = pos & OpenHashSet.POSITION_MASK
+              // If this value has already been seen then merge
+              if (otherBS.get(ind)) {
+                otherValues(ind) = merge(otherValues(ind), w)
+              } else { // otherwise just store the new value
+                otherBS.set(ind)
+                otherValues(ind) = w
               }
             }
-          } // end of for loop over tuples
+          }
           // Some vertices in this vertex set may not have a corresponding
           // tuple in the join and so a None value should be returned. 
-          val otherOption = newW.view.zipWithIndex
-            .map{ (x: (W, Int)) => if(wBS.get(x._2)) Option(x._1) else None }
-          // the final values is the zip of the values in this RDD along with
-          // the values in the other
-          val newValues = thisValues.view.zip(otherOption)
-          Iterator((newValues.toIndexedSeq, thisBS))
+          val newValues: Int => (V, Option[W]) = (ind: Int) => 
+            (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
+          Iterator((newValues, thisBS))
         } // end of newValues
-        new VertexSetRDD(index, newValues) 
+        new VertexSetRDD(index, newValuesRDD) 
       }
     }
   } // end of leftJoin
@@ -443,6 +412,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * For each key k in `this` or `other`, return a resulting RDD that contains a 
    * tuple with the list of values for that key in `this` as well as `other`.
    */
+   /*
   def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner): 
   VertexSetRDD[(Seq[V], Seq[W])] = {
     //RDD[(K, (Seq[V], Seq[W]))] = {
@@ -489,16 +459,17 @@ class VertexSetRDD[@specialized V: ClassManifest](
             assert(!thisIter.hasNext)
             val otherIndex = otherIter.next()
             assert(!otherIter.hasNext)
-            val newIndex = new VertexIdToIndexMap()
-            // @todo Merge only the keys that correspond to non-null values
             // Merge the keys
-            newIndex.putAll(thisIndex)
-            newIndex.putAll(otherIndex)
-            // We need to rekey the index
-            var ctr = 0
-            for (e <- newIndex.entrySet) {
-              e.setValue(ctr)
-              ctr += 1
+            val newIndex = new VertexIdToIndexMap(thisIndex.capacity + otherIndex.capacity)
+            var ind = thisIndex.nextPos(0)
+            while(ind >= 0) {
+              newIndex.fastAdd(thisIndex.getValue(ind))
+              ind = thisIndex.nextPos(ind+1)
+            }
+            var ind = otherIndex.nextPos(0)
+            while(ind >= 0) {
+              newIndex.fastAdd(otherIndex.getValue(ind))
+              ind = otherIndex.nextPos(ind+1)
             }
             List(newIndex).iterator
           }).cache()
@@ -604,7 +575,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
       }
     }
   } // end of cogroup
-
+ */
 
 } // End of VertexSetRDD
 
@@ -649,25 +620,18 @@ object VertexSetRDD {
     } 
 
     val groups = preAgg.mapPartitions( iter => {
-      val indexMap = new VertexIdToIndexMap()
-      val values = new ArrayBuffer[V]
+      val hashMap = new PrimitiveKeyOpenHashMap[Vid, V]
       for ((k,v) <- iter) {
-        if(!indexMap.contains(k)) {
-          val ind = indexMap.size
-          indexMap.put(k, ind)
-          values.append(v)
-        } else {
-          val ind = indexMap.get(k)
-          values(ind) = reduceFunc(values(ind), v)
-        }
+        hashMap.setMerge(k, v, reduceFunc)
       }
-      val bs = new BitSet(indexMap.size)
-      bs.setUntil(indexMap.size)
-      Iterator( (indexMap, (values.toIndexedSeq, bs)) )
+      val index = hashMap.keySet
+      val values: Int => V = (ind: Int) => hashMap._values(ind)
+      val bs = index.getBitSet
+      Iterator( (index, (values, bs)) )
       }, true).cache
     // extract the index and the values
     val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
-    val values: RDD[(IndexedSeq[V], BitSet)] = 
+    val values: RDD[(Int => V, BitSet)] = 
       groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
     new VertexSetRDD[V](new VertexSetIndex(index), values)
   } // end of apply
@@ -743,27 +707,31 @@ object VertexSetRDD {
       }
 
     // Use the index to build the new values table
-    val values: RDD[ (IndexedSeq[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
+    val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
       // There is only one map
       val index = indexIter.next()
       assert(!indexIter.hasNext())
-      val values = new Array[C](index.size)
-      val bs = new BitSet(index.size)
+      val values = new Array[C](index.capacity)
+      val bs = new BitSet(index.capacity)
       for ((k,c) <- tblIter) {
-        // @todo this extra check may be costing us a lot!
-        if (!index.contains(k)) {
+        // Get the location of the key in the index
+        val pos = index.getPos(k)
+        if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
           throw new SparkException("Error: Trying to bind an external index " +
             "to an RDD which contains keys that are not in the index.")
-        }
-        val ind = index(k)
-        if (bs.get(ind)) { 
-          values(ind) = mergeCombiners(values(ind), c) 
         } else {
-          values(ind) = c
-          bs.set(ind)
+          // Get the actual index
+          val ind = pos & OpenHashSet.POSITION_MASK
+          // If this value has already been seen then merge
+          if (bs.get(ind)) {
+            values(ind) = mergeCombiners(values(ind), c)
+          } else { // otherwise just store the new value
+            bs.set(ind)
+            values(ind) = c
+          }
         }
       }
-      Iterator((values, bs))
+      Iterator(((ind: Int) => values(ind), bs))
     })
     new VertexSetRDD(index, values)
   } // end of apply
@@ -792,14 +760,9 @@ object VertexSetRDD {
     }
 
     val index = shuffledTbl.mapPartitions( iter => {
-      val indexMap = new VertexIdToIndexMap()
-      for ( (k,_) <- iter ){
-        if(!indexMap.contains(k)){
-          val ind = indexMap.size
-          indexMap.put(k, ind)   
-        }
-      }
-      Iterator(indexMap)
+      val index = new VertexIdToIndexMap
+      for ( (k,_) <- iter ){ index.add(k) }
+      Iterator(index)
       }, true).cache
     new VertexSetIndex(index)
   }
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index d545ba5d1b7b5ecf7650d84a684a75cd82201e94..0418305a4de8f79dc59dc88cebe4829a98ec1fc2 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -5,7 +5,6 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.mutable.ArrayBuilder
-import scala.collection.mutable.BitSet
 
 
 import org.apache.spark.SparkContext._
@@ -21,6 +20,12 @@ import org.apache.spark.graph._
 import org.apache.spark.graph.impl.GraphImpl._
 import org.apache.spark.graph.impl.MessageToPartitionRDDFunctions._
 
+import org.apache.spark.util.hash.BitSet
+import org.apache.spark.util.hash.OpenHashSet
+import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
+
+
+
 /**
  * The Iterator type returned when constructing edge triplets
  */
@@ -31,15 +36,16 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
 
   private var pos = 0
   private val et = new EdgeTriplet[VD, ED]
+  private val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
   
   override def hasNext: Boolean = pos < edgePartition.size
   override def next() = {
     et.srcId = edgePartition.srcIds(pos)
     // assert(vmap.containsKey(e.src.id))
-    et.srcAttr = vertexArray(vidToIndex(et.srcId))
+    et.srcAttr = vmap(et.srcId)
     et.dstId = edgePartition.dstIds(pos)
     // assert(vmap.containsKey(e.dst.id))
-    et.dstAttr = vertexArray(vidToIndex(et.dstId))
+    et.dstAttr = vmap(et.dstId)
     et.attr = edgePartition.data(pos)
     pos += 1
     et
@@ -51,10 +57,10 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
     for (i <- (0 until edgePartition.size)) {
       currentEdge.srcId = edgePartition.srcIds(i)
       // assert(vmap.containsKey(e.src.id))
-      currentEdge.srcAttr = vertexArray(vidToIndex(currentEdge.srcId))
+      currentEdge.srcAttr = vmap(currentEdge.srcId)
       currentEdge.dstId = edgePartition.dstIds(i)
       // assert(vmap.containsKey(e.dst.id))
-      currentEdge.dstAttr = vertexArray(vidToIndex(currentEdge.dstId))
+      currentEdge.dstAttr = vmap(currentEdge.dstId)
       currentEdge.attr = edgePartition.data(i)
       lb += currentEdge
     }
@@ -63,23 +69,6 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
 } // end of Edge Triplet Iterator
 
 
-
-object EdgeTripletBuilder {
-  def makeTriplets[VD: ClassManifest, ED: ClassManifest]( 
-    localVidMap: RDD[(Pid, VertexIdToIndexMap)],
-    vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
-    eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
-    localVidMap.zipPartitions(vTableReplicatedValues, eTable) {
-      (vidMapIter, replicatedValuesIter, eTableIter) =>
-      val (_, vidToIndex) = vidMapIter.next()
-      val (_, vertexArray) = replicatedValuesIter.next()
-      val (_, edgePartition) = eTableIter.next()
-      new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
-    }
-  }
-}
-
-
 /**
  * A Graph RDD that supports computation on graphs.
  */
@@ -90,6 +79,10 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
     @transient val eTable: RDD[(Pid, EdgePartition[ED])] )
   extends Graph[VD, ED] {
 
+  def this() = this(null, null, null, null)
+
+
+
   /**
    * (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
    * vertex data after it is replicated. Within each partition, it holds a map
@@ -115,7 +108,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
 
   /** Return a RDD that brings edges with its source and destination vertices together. */
   @transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
-    EdgeTripletBuilder.makeTriplets(localVidMap, vTableReplicatedValues, eTable)
+    makeTriplets(localVidMap, vTableReplicatedValues, eTable)
 
 
   override def cache(): Graph[VD, ED] = {
@@ -219,24 +212,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
   }
 
 
-  override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2):
-    Graph[VD, ED2] = {
-    val newETable = eTable.zipPartitions(localVidMap, vTableReplicatedValues){ 
-      (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
-      val (pid, edgePartition) = edgePartitionIter.next()
-      val (_, vidToIndex) = vidToIndexIter.next()
-      val (_, vertexArray) = vertexArrayIter.next()
-      val et = new EdgeTriplet[VD, ED]
-      val newEdgePartition = edgePartition.map{e =>
-        et.set(e)
-        et.srcAttr = vertexArray(vidToIndex(e.srcId))
-        et.dstAttr = vertexArray(vidToIndex(e.dstId))
-        f(et)
-      }
-      Iterator((pid, newEdgePartition))
-    }
-    new GraphImpl(vTable, vid2pid, localVidMap, newETable)
-  }
+  override def mapTriplets[ED2: ClassManifest](f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] =
+    GraphImpl.mapTriplets(this, f)
 
 
   override def subgraph(epred: EdgeTriplet[VD,ED] => Boolean = (x => true), 
@@ -330,57 +307,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
   override def mapReduceTriplets[A: ClassManifest](
       mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
       reduceFunc: (A, A) => A)
-    : VertexSetRDD[A] = {
-
-    ClosureCleaner.clean(mapFunc)
-    ClosureCleaner.clean(reduceFunc)
-
-    // Map and preaggregate 
-    val preAgg = eTable.zipPartitions(localVidMap, vTableReplicatedValues){ 
-      (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
-      val (pid, edgePartition) = edgePartitionIter.next()
-      val (_, vidToIndex) = vidToIndexIter.next()
-      val (_, vertexArray) = vertexArrayIter.next()
-      // We can reuse the vidToIndex map for aggregation here as well.
-      /** @todo Since this has the downside of not allowing "messages" to arbitrary
-       * vertices we should consider just using a fresh map.
-       */
-      val msgArray = new Array[A](vertexArray.size)
-      val msgBS = new BitSet(vertexArray.size)
-      // Iterate over the partition
-      val et = new EdgeTriplet[VD, ED]
-      edgePartition.foreach{e => 
-        et.set(e)
-        et.srcAttr = vertexArray(vidToIndex(e.srcId))
-        et.dstAttr = vertexArray(vidToIndex(e.dstId))
-        mapFunc(et).foreach{ case (vid, msg) =>
-          // verify that the vid is valid
-          assert(vid == et.srcId || vid == et.dstId)
-          val ind = vidToIndex(vid)
-          // Populate the aggregator map
-          if(msgBS(ind)) {
-            msgArray(ind) = reduceFunc(msgArray(ind), msg)
-          } else { 
-            msgArray(ind) = msg
-            msgBS(ind) = true
-          }
-        }
-      }
-      // Return the aggregate map
-      vidToIndex.long2IntEntrySet().fastIterator()
-      // Remove the entries that did not receive a message
-      .filter{ entry => msgBS(entry.getValue()) }
-      // Construct the actual pairs
-      .map{ entry => 
-        val vid = entry.getLongKey()
-        val ind = entry.getValue()
-        val msg = msgArray(ind)
-        (vid, msg)
-      }
-      }.partitionBy(vTable.index.rdd.partitioner.get)
-    // do the final reduction reusing the index map
-    VertexSetRDD(preAgg, vTable.index, reduceFunc)
-  }
+    : VertexSetRDD[A] = 
+    GraphImpl.mapReduceTriplets(this, mapFunc, reduceFunc)
 
 
   override def outerJoinVertices[U: ClassManifest, VD2: ClassManifest]
@@ -436,7 +364,6 @@ object GraphImpl {
   }
 
 
-
   /**
    * Create the edge table RDD, which is much more efficient for Java heap storage than the
    * normal edges data structure (RDD[(Vid, Vid, ED)]).
@@ -493,16 +420,9 @@ object GraphImpl {
     RDD[(Pid, VertexIdToIndexMap)] = {
     eTable.mapPartitions( _.map{ case (pid, epart) =>
       val vidToIndex = new VertexIdToIndexMap
-      var i = 0
       epart.foreach{ e => 
-        if(!vidToIndex.contains(e.srcId)) {
-          vidToIndex.put(e.srcId, i)
-          i += 1
-        }
-        if(!vidToIndex.contains(e.dstId)) {
-          vidToIndex.put(e.dstId, i)
-          i += 1
-        }
+        vidToIndex.add(e.srcId)
+        vidToIndex.add(e.dstId)
       }
       (pid, vidToIndex)
     }, preservesPartitioning = true).cache()
@@ -527,9 +447,9 @@ object GraphImpl {
       val (pid, vidToIndex) = mapIter.next()
       assert(!mapIter.hasNext)
       // Populate the vertex array using the vidToIndex map
-      val vertexArray = new Array[VD](vidToIndex.size)
+      val vertexArray = new Array[VD](vidToIndex.capacity)
       for (msg <- msgsIter) {
-        val ind = vidToIndex(msg.data._1)
+        val ind = vidToIndex.getPos(msg.data._1) & OpenHashSet.POSITION_MASK
         vertexArray(ind) = msg.data._2
       }
       Iterator((pid, vertexArray))
@@ -539,6 +459,95 @@ object GraphImpl {
   }
 
 
+  def makeTriplets[VD: ClassManifest, ED: ClassManifest]( 
+    localVidMap: RDD[(Pid, VertexIdToIndexMap)],
+    vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
+    eTable: RDD[(Pid, EdgePartition[ED])]): RDD[EdgeTriplet[VD, ED]] = {
+    localVidMap.zipPartitions(vTableReplicatedValues, eTable) {
+      (vidMapIter, replicatedValuesIter, eTableIter) =>
+      val (_, vidToIndex) = vidMapIter.next()
+      val (_, vertexArray) = replicatedValuesIter.next()
+      val (_, edgePartition) = eTableIter.next()
+      new EdgeTripletIterator(vidToIndex, vertexArray, edgePartition)
+    }
+  }
+
+
+  def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
+    g: GraphImpl[VD, ED],   
+    f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
+    val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){ 
+      (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
+      val (pid, edgePartition) = edgePartitionIter.next()
+      val (_, vidToIndex) = vidToIndexIter.next()
+      val (_, vertexArray) = vertexArrayIter.next()
+      val et = new EdgeTriplet[VD, ED]
+      val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
+      val newEdgePartition = edgePartition.map{e =>
+        et.set(e)
+        et.srcAttr = vmap(e.srcId)
+        et.dstAttr = vmap(e.dstId)
+        f(et)
+      }
+      Iterator((pid, newEdgePartition))
+    }
+    new GraphImpl(g.vTable, g.vid2pid, g.localVidMap, newETable)
+  }
+
+
+  def mapReduceTriplets[VD: ClassManifest, ED: ClassManifest, A: ClassManifest](
+    g: GraphImpl[VD, ED],
+    mapFunc: EdgeTriplet[VD, ED] => Array[(Vid, A)],
+    reduceFunc: (A, A) => A): VertexSetRDD[A] = {
+
+    ClosureCleaner.clean(mapFunc)
+    ClosureCleaner.clean(reduceFunc)
+
+    // Map and preaggregate 
+    val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){ 
+      (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
+      val (pid, edgePartition) = edgePartitionIter.next()
+      val (_, vidToIndex) = vidToIndexIter.next()
+      val (_, vertexArray) = vertexArrayIter.next()
+      assert(!edgePartitionIter.hasNext)
+      assert(!vidToIndexIter.hasNext)
+      assert(!vertexArrayIter.hasNext)
+      assert(vidToIndex.capacity == vertexArray.size)
+      val vmap = new PrimitiveKeyOpenHashMap[Vid, VD](vidToIndex, vertexArray)
+      // We can reuse the vidToIndex map for aggregation here as well.
+      /** @todo Since this has the downside of not allowing "messages" to arbitrary
+       * vertices we should consider just using a fresh map.
+       */
+      val msgArray = new Array[A](vertexArray.size)
+      val msgBS = new BitSet(vertexArray.size)
+      // Iterate over the partition
+      val et = new EdgeTriplet[VD, ED]
+      edgePartition.foreach{e => 
+        et.set(e)
+        et.srcAttr = vmap(e.srcId)
+        et.dstAttr = vmap(e.dstId)
+        mapFunc(et).foreach{ case (vid, msg) =>
+          // verify that the vid is valid
+          assert(vid == et.srcId || vid == et.dstId)
+          // Get the index of the key
+          val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
+          // Populate the aggregator map
+          if(msgBS.get(ind)) {
+            msgArray(ind) = reduceFunc(msgArray(ind), msg)
+          } else { 
+            msgArray(ind) = msg
+            msgBS.set(ind)
+          }
+        }
+      }
+      // construct an iterator of tuples Iterator[(Vid, A)]
+      msgBS.iterator.map( ind => (vidToIndex.getValue(ind), msgArray(ind)) )
+    }.partitionBy(g.vTable.index.rdd.partitioner.get)
+    // do the final reduction reusing the index map
+    VertexSetRDD(preAgg, g.vTable.index, reduceFunc)
+  }
+
+
   protected def edgePartitionFunction1D(src: Vid, dst: Vid, numParts: Pid): Pid = {
     val mixingPrime: Vid = 1125899906842597L 
     (math.abs(src) * mixingPrime).toInt % numParts
diff --git a/graph/src/main/scala/org/apache/spark/graph/package.scala b/graph/src/main/scala/org/apache/spark/graph/package.scala
index 4627c3566ca192f675c4129cb23a2ff0398f727b..37a4fb4a5e1498d34f95f929a79feb94285f5e0b 100644
--- a/graph/src/main/scala/org/apache/spark/graph/package.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/package.scala
@@ -1,5 +1,10 @@
 package org.apache.spark
 
+import org.apache.spark.util.hash.BitSet
+import org.apache.spark.util.hash.OpenHashSet
+import org.apache.spark.util.hash.PrimitiveKeyOpenHashMap
+
+
 package object graph {
 
   type Vid = Long
@@ -8,8 +13,9 @@ package object graph {
   type VertexHashMap[T] = it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap[T]
   type VertexSet = it.unimi.dsi.fastutil.longs.LongOpenHashSet
   type VertexArrayList = it.unimi.dsi.fastutil.longs.LongArrayList
-  // @todo replace with rxin's fast hashmap
-  type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
+  
+  //  type VertexIdToIndexMap = it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap
+  type VertexIdToIndexMap = OpenHashSet[Vid]
 
   /**
    * Return the default null-like value for a data type T.
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index f2b3d5bdfe26882025b6f3d98b7ecbf19dccf9e4..2067b1613ef8de61ba57133b59432350b38a0ee0 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -77,16 +77,15 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     withSpark(new SparkContext("local", "test")) { sc =>
       val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5)
       val b = VertexSetRDD(a).mapValues(x => -x)
-      assert(b.leftJoin(a)
-        .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
+      assert(b.count === 101)
+      assert(b.leftJoin(a).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
       val c = VertexSetRDD(a, b.index)
-      assert(b.leftJoin(c)
-        .mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
+      assert(b.leftJoin(c).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
       val d = c.filter(q => ((q._2 % 2) == 0))
       val e = a.filter(q => ((q._2 % 2) == 0))
       assert(d.count === e.count)
-      assert(b.zipJoin(c).mapValues(x => x._1 + x._2)
-        .map(x => x._2).reduce(_+_) === 0)
+      assert(b.zipJoin(c).mapValues(x => x._1 + x._2).map(x => x._2).reduce(_+_) === 0)
+
     }
   }