diff --git a/README.md b/README.md
index 7790139c8ffd93a95e16d11b13b86dbc63098ecc..5b06d822252296911bfbcfdcaf143afbc9040056 100644
--- a/README.md
+++ b/README.md
@@ -27,7 +27,7 @@ the challenges of graph construction and transformation and provide
 limited fault-tolerance and support for interactive analysis.
 
 <p align="center">
-  <img src="https://raw.github.com/jegonzal/graphx/Documentation/docs/img/data_parallel_vs_graph_parallel.png" />
+  <img src="https://raw.github.com/amplab/graphx/master/docs/img/data_parallel_vs_graph_parallel.png" />
 </p>
 
 
@@ -47,7 +47,7 @@ Finally, by exploiting the Scala foundation of Spark, we enable users
 to interactively load, transform, and compute on massive graphs.
 
 <p align="center">
-  <img src="https://raw.github.com/jegonzal/graphx/Documentation/docs/img/tables_and_graphs.png" />
+  <img src="https://raw.github.com/amplab/graphx/master/docs/img/tables_and_graphs.png" />
 </p>
 
 ## Examples
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
index 5e264b48ddc223906a03d2659ec623041487507e..1f794379f74f34d834c9bc4720380ab4d2f07386 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -41,9 +41,11 @@ class BitSet(numBits: Int) {
     val wordIndex = bitIndex >> 6 // divide by 64
     var i = 0
     while(i < wordIndex) { words(i) = -1; i += 1 }
-    // Set the remaining bits
-    val mask = ~(-1L << (bitIndex & 0x3f))
-    words(wordIndex) |= mask
+    if(wordIndex < words.size) {
+      // Set the remaining bits (note that the mask could still be zero)
+      val mask = ~(-1L << (bitIndex & 0x3f))
+      words(wordIndex) |= mask
+    }
   }
 
 
diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
index 82b9198e432c728179de848b2b8b1672e7dbbc4d..baf8099556c061087922acc363cda0759994f86f 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphKryoRegistrator.scala
@@ -18,6 +18,7 @@ class GraphKryoRegistrator extends KryoRegistrator {
     kryo.register(classOf[EdgePartition[Object]])
     kryo.register(classOf[BitSet])
     kryo.register(classOf[VertexIdToIndexMap])
+    kryo.register(classOf[VertexAttributeBlock[Object]])
     // This avoids a large number of hash table lookups.
     kryo.setReferences(false)
   }
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
index 62608e506d85b820275d9a5a627ddb7c05cf7173..401d5f29bc134e04b2a8c036f7eeb8dad9e2b314 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
@@ -25,6 +25,7 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa
 import org.apache.spark.graph.impl.AggregationMsg
 import org.apache.spark.graph.impl.MsgRDDFunctions._
 
+
 /**
  * The `VertexSetIndex` maintains the per-partition mapping from
  * vertex id to the corresponding location in the per-partition values
@@ -88,7 +89,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
   extends RDD[(Vid, V)](index.rdd.context,
     List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
 
-
   /**
    * Construct a new VertexSetRDD that is indexed by only the keys in the RDD.
    * The resulting VertexSet will be based on a different index and can
@@ -96,7 +96,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
    */
    def reindex(): VertexSetRDD[V] = VertexSetRDD(this)
 
-
   /**
    * An internal representation which joins the block indices with the values
    * This is used by the compute function to emulate RDD[(Vid, V)]
@@ -104,19 +103,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
   protected[spark] val tuples =
     new ZippedRDD(index.rdd.context, index.rdd, valuesRDD)
 
-
   /**
    * The partitioner is defined by the index.
    */
   override val partitioner = index.rdd.partitioner
 
-
   /**
    * The actual partitions are defined by the tuples.
    */
   override def getPartitions: Array[Partition] = tuples.getPartitions
 
-
   /**
    * The preferred locations are computed based on the preferred
    * locations of the tuples.
@@ -124,7 +120,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
   override def getPreferredLocations(s: Partition): Seq[String] =
     tuples.getPreferredLocations(s)
 
-
   /**
    * Caching an VertexSetRDD causes the index and values to be cached separately.
    */
@@ -134,15 +129,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
     return this
   }
 
-
   /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
   override def persist(): VertexSetRDD[V] = persist(StorageLevel.MEMORY_ONLY)
 
-
   /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
   override def cache(): VertexSetRDD[V] = persist()
 
-
   /**
    * Provide the RDD[(K,V)] equivalent output.
    */
@@ -152,7 +144,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
     }
   } // end of compute
 
-
   /**
    * Restrict the vertex set to the set of vertices satisfying the
    * given predicate.
@@ -190,7 +181,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
     new VertexSetRDD[V](index, newValues)
   } // end of filter
 
-
   /**
    * Pass each vertex attribute through a map function and retain the
    * original RDD's partitioning and index.
@@ -214,7 +204,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
     new VertexSetRDD[U](index, newValuesRDD)
   } // end of mapValues
 
-
   /**
    * Pass each vertex attribute along with the vertex id through a map
    * function and retain the original RDD's partitioning and index.
@@ -247,8 +236,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
 
 
   /**
-   * @todo update docs to reflect function argument
-   *
    * Inner join this VertexSet with another VertexSet which has the
    * same Index.  This function will fail if both VertexSets do not
    * share the same index.  The resulting vertex set will only contain
@@ -257,6 +244,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * @tparam W the attribute type of the other VertexSet
    *
    * @param other the other VertexSet with which to join.
+   * @param f the function mapping a vertex id and its attributes in
+   * this and the other vertex set to a new vertex attribute.
    * @return a VertexSetRDD containing only the vertices in both this
    * and the other VertexSet and with tuple attributes.
    *
@@ -287,13 +276,16 @@ class VertexSetRDD[@specialized V: ClassManifest](
 
 
   /**
-   * @todo document
+   * Inner join this VertexSet with another VertexSet which has the
+   * same Index.  This function will fail if both VertexSets do not
+   * share the same index.
    *
-   * @param other
-   * @param f
-   * @tparam W
-   * @tparam Z
-   * @return
+   * @param other the vertex set to join with this vertex set
+   * @param f the function mapping a vertex id and its attributes in
+   * this and the other vertex set to a collection of tuples.
+   * @tparam W the type of the other vertex set attributes
+   * @tparam Z the type of the tuples emitted by `f`
+   * @return an RDD containing the tuples emitted by `f`
    */
   def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]):
   RDD[Z] = {
@@ -316,8 +308,6 @@ class VertexSetRDD[@specialized V: ClassManifest](
 
 
   /**
-   * @todo update docs to reflect function argument
-
    * Left join this VertexSet with another VertexSet which has the
    * same Index.  This function will fail if both VertexSets do not
    * share the same index.  The resulting vertex set contains an entry
@@ -327,6 +317,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * @tparam W the attribute type of the other VertexSet
    *
    * @param other the other VertexSet with which to join.
+   * @param f the function mapping a vertex id and its attributes in
+   * this and the other vertex set to a new vertex attribute.
    * @return a VertexSetRDD containing all the vertices in this
    * VertexSet with `None` attributes used for Vertices missing in the
    * other VertexSet.
@@ -368,11 +360,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * @tparam W the attribute type of the other VertexSet
    *
    * @param other the other VertexSet with which to join.
+   * @param f the function mapping a vertex id and its attributes in
+   * this and the other vertex set to a new vertex attribute.
    * @param merge the function used combine duplicate vertex
    * attributes
    * @return a VertexSetRDD containing all the vertices in this
-   * VertexSet with `None` attributes used for Vertices missing in the
-   * other VertexSet.
+   * VertexSet with the attribute emitted by f.
    *
    */
   def leftJoin[W: ClassManifest, Z: ClassManifest](other: RDD[(Vid,W)])
@@ -396,181 +389,9 @@ class VertexSetRDD[@specialized V: ClassManifest](
     }
   } // end of leftJoin
 
-
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this` as well as `other`.
-   */
-   /*
-  def cogroup[W: ClassManifest](other: RDD[(Vid, W)], partitioner: Partitioner):
-  VertexSetRDD[(Seq[V], Seq[W])] = {
-    //RDD[(K, (Seq[V], Seq[W]))] = {
-    other match {
-      case other: VertexSetRDD[_] if index == other.index => {
-        // if both RDDs share exactly the same index and therefore the same
-        // super set of keys then we simply merge the value RDDs.
-        // However it is possible that both RDDs are missing a value for a given key in
-        // which case the returned RDD should have a null value
-        val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
-          valuesRDD.zipPartitions(other.valuesRDD){
-          (thisIter, otherIter) =>
-            val (thisValues, thisBS) = thisIter.next()
-            assert(!thisIter.hasNext)
-            val (otherValues, otherBS) = otherIter.next()
-            assert(!otherIter.hasNext)
-            /**
-             * @todo consider implementing this with a view as in leftJoin to
-             * reduce array allocations
-             */
-            val newValues = new Array[(Seq[V], Seq[W])](thisValues.size)
-            val newBS = thisBS | otherBS
-
-            var ind = newBS.nextSetBit(0)
-            while(ind >= 0) {
-              val a = if (thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
-              val b = if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
-              newValues(ind) = (a, b)
-              ind = newBS.nextSetBit(ind+1)
-            }
-            Iterator((newValues.toIndexedSeq, newBS))
-        }
-        new VertexSetRDD(index, newValues)
-      }
-      case other: VertexSetRDD[_]
-        if index.rdd.partitioner == other.index.rdd.partitioner => {
-        // If both RDDs are indexed using different indices but with the same partitioners
-        // then we we need to first merge the indicies and then use the merged index to
-        // merge the values.
-        val newIndex =
-          index.rdd.zipPartitions(other.index.rdd)(
-            (thisIter, otherIter) => {
-            val thisIndex = thisIter.next()
-            assert(!thisIter.hasNext)
-            val otherIndex = otherIter.next()
-            assert(!otherIter.hasNext)
-            // Merge the keys
-            val newIndex = new VertexIdToIndexMap(thisIndex.capacity + otherIndex.capacity)
-            var ind = thisIndex.nextPos(0)
-            while(ind >= 0) {
-              newIndex.fastAdd(thisIndex.getValue(ind))
-              ind = thisIndex.nextPos(ind+1)
-            }
-            var ind = otherIndex.nextPos(0)
-            while(ind >= 0) {
-              newIndex.fastAdd(otherIndex.getValue(ind))
-              ind = otherIndex.nextPos(ind+1)
-            }
-            List(newIndex).iterator
-          }).cache()
-        // Use the new index along with the this and the other indices to merge the values
-        val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
-          newIndex.zipPartitions(tuples, other.tuples)(
-            (newIndexIter, thisTuplesIter, otherTuplesIter) => {
-              // Get the new index for this partition
-              val newIndex = newIndexIter.next()
-              assert(!newIndexIter.hasNext)
-              // Get the corresponding indicies and values for this and the other VertexSetRDD
-              val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
-              assert(!thisTuplesIter.hasNext)
-              val (otherIndex, (otherValues, otherBS)) = otherTuplesIter.next()
-              assert(!otherTuplesIter.hasNext)
-              // Preallocate the new Values array
-              val newValues = new Array[(Seq[V], Seq[W])](newIndex.size)
-              val newBS = new BitSet(newIndex.size)
-
-              // Lookup the sequences in both submaps
-              for ((k,ind) <- newIndex) {
-                // Get the left key
-                val a = if (thisIndex.contains(k)) {
-                  val ind = thisIndex.get(k)
-                  if(thisBS.get(ind)) Seq(thisValues(ind)) else Seq.empty[V]
-                } else Seq.empty[V]
-                // Get the right key
-                val b = if (otherIndex.contains(k)) {
-                  val ind = otherIndex.get(k)
-                  if (otherBS.get(ind)) Seq(otherValues(ind)) else Seq.empty[W]
-                } else Seq.empty[W]
-                // If at least one key was present then we generate a tuple.
-                if (!a.isEmpty || !b.isEmpty) {
-                  newValues(ind) = (a, b)
-                  newBS.set(ind)
-                }
-              }
-              Iterator((newValues.toIndexedSeq, newBS))
-            })
-        new VertexSetRDD(new VertexSetIndex(newIndex), newValues)
-      }
-      case _ => {
-        // Get the partitioner from the index
-        val partitioner = index.rdd.partitioner match {
-          case Some(p) => p
-          case None => throw new SparkException("An index must have a partitioner.")
-        }
-        // Shuffle the other RDD using the partitioner for this index
-        val otherShuffled =
-          if (other.partitioner == Some(partitioner)) {
-            other
-          } else {
-            other.partitionBy(partitioner)
-          }
-        // Join the other RDD with this RDD building a new valueset and new index on the fly
-        val groups = tuples.zipPartitions(otherShuffled)(
-          (thisTuplesIter, otherTuplesIter) => {
-            // Get the corresponding indicies and values for this VertexSetRDD
-            val (thisIndex, (thisValues, thisBS)) = thisTuplesIter.next()
-            assert(!thisTuplesIter.hasNext())
-            // Construct a new index
-            val newIndex = thisIndex.clone().asInstanceOf[VertexIdToIndexMap]
-            // Construct a new array Buffer to store the values
-            val newValues = ArrayBuffer.fill[ (Seq[V], Seq[W]) ](thisValues.size)(null)
-            val newBS = new BitSet(thisValues.size)
-            // populate the newValues with the values in this VertexSetRDD
-            for ((k,i) <- thisIndex) {
-              if (thisBS.get(i)) {
-                newValues(i) = (Seq(thisValues(i)), ArrayBuffer.empty[W])
-                newBS.set(i)
-              }
-            }
-            // Now iterate through the other tuples updating the map
-            for ((k,w) <- otherTuplesIter){
-              if (newIndex.contains(k)) {
-                val ind = newIndex.get(k)
-                if(newBS.get(ind)) {
-                  newValues(ind)._2.asInstanceOf[ArrayBuffer[W]].append(w)
-                } else {
-                  // If the other key was in the index but not in the values
-                  // of this indexed RDD then create a new values entry for it
-                  newBS.set(ind)
-                  newValues(ind) = (Seq.empty[V], ArrayBuffer(w))
-                }
-              } else {
-                // update the index
-                val ind = newIndex.size
-                newIndex.put(k, ind)
-                newBS.set(ind)
-                // Update the values
-                newValues.append( (Seq.empty[V], ArrayBuffer(w) ) )
-              }
-            }
-            Iterator( (newIndex, (newValues.toIndexedSeq, newBS)) )
-          }).cache()
-
-        // Extract the index and values from the above RDD
-        val newIndex = groups.mapPartitions(_.map{ case (kMap,vAr) => kMap }, true)
-        val newValues: RDD[(IndexedSeq[(Seq[V], Seq[W])], BitSet)] =
-          groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
-
-        new VertexSetRDD[(Seq[V], Seq[W])](new VertexSetIndex(newIndex), newValues)
-      }
-    }
-  } // end of cogroup
- */
-
 } // End of VertexSetRDD
 
 
-
 /**
  * The VertexSetRDD singleton is used to construct VertexSets
  */
@@ -627,7 +448,6 @@ object VertexSetRDD {
     new VertexSetRDD[V](new VertexSetIndex(index), values)
   } // end of apply
 
-
   /**
    * Construct a vertex set from an RDD using an existing index.
    *
@@ -642,7 +462,6 @@ object VertexSetRDD {
     rdd: RDD[(Vid,V)], index: VertexSetIndex): VertexSetRDD[V] =
     apply(rdd, index, (a:V,b:V) => a)
 
-
   /**
    * Construct a vertex set from an RDD using an existing index and a
    * user defined `combiner` to merge duplicate vertices.
@@ -659,8 +478,17 @@ object VertexSetRDD {
     reduceFunc: (V, V) => V): VertexSetRDD[V] =
     apply(rdd,index, (v:V) => v, reduceFunc, reduceFunc)
 
-
-  def aggregate[V: ClassManifest](
+  /**
+   * Construct a vertex set from an RDD of AggregationMsgs
+   *
+   * @tparam V the vertex attribute type
+   * @param rdd the rdd containing vertices
+   * @param index the index which must be a superset of the vertices
+   * in RDD
+   * @param reduceFunc the user defined reduce function used to merge
+   * duplicate vertex attributes.
+   */
+  private[spark] def aggregate[V: ClassManifest](
     rdd: RDD[AggregationMsg[V]], index: VertexSetIndex,
     reduceFunc: (V, V) => V): VertexSetRDD[V] = {
 
@@ -696,7 +524,6 @@ object VertexSetRDD {
     new VertexSetRDD(index, values)
   }
 
-
   /**
    * Construct a vertex set from an RDD using an existing index and a
    * user defined `combiner` to merge duplicate vertices.
@@ -767,7 +594,6 @@ object VertexSetRDD {
     new VertexSetRDD(index, values)
   } // end of apply
 
-
   /**
    * Construct an index of the unique vertices.  The resulting index
    * can be used to build VertexSets over subsets of the vertices in
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index defe34dfc9dd9f9997437c016a4b03e3dc88edc3..693bb888bca265a9b73720c63968cd26e542d6e0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -12,6 +12,7 @@ import org.apache.spark.util.ClosureCleaner
 import org.apache.spark.graph._
 import org.apache.spark.graph.impl.GraphImpl._
 import org.apache.spark.graph.impl.MsgRDDFunctions._
+import org.apache.spark.graph.util.BytecodeUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHashMap}
@@ -21,9 +22,9 @@ import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveKeyOpenHa
  * The Iterator type returned when constructing edge triplets
  */
 class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
-  val vidToIndex: VertexIdToIndexMap,
-  val vertexArray: Array[VD],
-  val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
+    val vidToIndex: VertexIdToIndexMap,
+    val vertexArray: Array[VD],
+    val edgePartition: EdgePartition[ED]) extends Iterator[EdgeTriplet[VD, ED]] {
 
   private var pos = 0
   private val et = new EdgeTriplet[VD, ED]
@@ -62,27 +63,25 @@ class EdgeTripletIterator[VD: ClassManifest, ED: ClassManifest](
 
 /**
  * A Graph RDD that supports computation on graphs.
+ *
+ * @param localVidMap Stores the location of vertex attributes after they are
+ * replicated. Within each partition, localVidMap holds a map from vertex ID to
+ * the index where that vertex's attribute is stored. This index refers to the
+ * arrays in the same partition in the variants of
+ * [[VTableReplicatedValues]]. Therefore, localVidMap can be reused across
+ * changes to the vertex attributes.
  */
 class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
     @transient val vTable: VertexSetRDD[VD],
-    @transient val vid2pid: VertexSetRDD[Array[Pid]],
+    @transient val vid2pid: Vid2Pid,
     @transient val localVidMap: RDD[(Pid, VertexIdToIndexMap)],
     @transient val eTable: RDD[(Pid, EdgePartition[ED])] )
   extends Graph[VD, ED] {
 
   def this() = this(null, null, null, null)
 
-  /**
-   * (localVidMap: VertexSetRDD[Pid, VertexIdToIndexMap]) is a version of the
-   * vertex data after it is replicated. Within each partition, it holds a map
-   * from vertex ID to the index where that vertex's attribute is stored. This
-   * index refers to an array in the same partition in vTableReplicatedValues.
-   *
-   * (vTableReplicatedValues: VertexSetRDD[Pid, Array[VD]]) holds the vertex data
-   * and is arranged as described above.
-   */
-  @transient val vTableReplicatedValues: RDD[(Pid, Array[VD])] =
-    createVTableReplicated(vTable, vid2pid, localVidMap)
+  @transient val vTableReplicatedValues: VTableReplicatedValues[VD] =
+    new VTableReplicatedValues(vTable, vid2pid, localVidMap)
 
   /** Return a RDD of vertices. */
   @transient override val vertices = vTable
@@ -94,7 +93,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
 
   /** Return a RDD that brings edges with its source and destination vertices together. */
   @transient override val triplets: RDD[EdgeTriplet[VD, ED]] =
-    makeTriplets(localVidMap, vTableReplicatedValues, eTable)
+    makeTriplets(localVidMap, vTableReplicatedValues.bothAttrs, eTable)
 
   override def persist(newLevel: StorageLevel): Graph[VD, ED] = {
     eTable.persist(newLevel)
@@ -110,15 +109,22 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
   override def statistics: Map[String, Any] = {
     val numVertices = this.numVertices
     val numEdges = this.numEdges
-    val replicationRatio =
-      vid2pid.map(kv => kv._2.size).sum / vTable.count
+    val replicationRatioBothAttrs =
+      vid2pid.bothAttrs.map(kv => kv._2.size).sum / numVertices
+    val replicationRatioSrcAttrOnly =
+      vid2pid.srcAttrOnly.map(kv => kv._2.size).sum / numVertices
+    val replicationRatioDstAttrOnly =
+      vid2pid.dstAttrOnly.map(kv => kv._2.size).sum / numVertices
     val loadArray =
       eTable.map{ case (pid, epart) => epart.data.size }.collect.map(x => x.toDouble / numEdges)
     val minLoad = loadArray.min
     val maxLoad = loadArray.max
     Map(
       "Num Vertices" -> numVertices, "Num Edges" -> numEdges,
-      "Replication" -> replicationRatio, "Load Array" -> loadArray,
+      "Replication (both)" -> replicationRatioBothAttrs,
+      "Replication (src only)" -> replicationRatioSrcAttrOnly,
+      "Replication (dest only)" -> replicationRatioDstAttrOnly,
+      "Load Array" -> loadArray,
       "Min Load" -> minLoad, "Max Load" -> maxLoad)
   }
 
@@ -161,18 +167,18 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
     traverseLineage(vTable, "  ", visited)
     visited += (vTable.id -> "vTable")
 
-    println("\n\nvid2pid -----------------------------------------")
-    traverseLineage(vid2pid, "  ", visited)
-    visited += (vid2pid.id -> "vid2pid")
-    visited += (vid2pid.valuesRDD.id -> "vid2pid.values")
+    println("\n\nvid2pid.bothAttrs -------------------------------")
+    traverseLineage(vid2pid.bothAttrs, "  ", visited)
+    visited += (vid2pid.bothAttrs.id -> "vid2pid")
+    visited += (vid2pid.bothAttrs.valuesRDD.id -> "vid2pid.bothAttrs")
 
     println("\n\nlocalVidMap -------------------------------------")
     traverseLineage(localVidMap, "  ", visited)
     visited += (localVidMap.id -> "localVidMap")
 
-    println("\n\nvTableReplicatedValues --------------------------")
-    traverseLineage(vTableReplicatedValues, "  ", visited)
-    visited += (vTableReplicatedValues.id -> "vTableReplicatedValues")
+    println("\n\nvTableReplicatedValues.bothAttrs ----------------")
+    traverseLineage(vTableReplicatedValues.bothAttrs, "  ", visited)
+    visited += (vTableReplicatedValues.bothAttrs.id -> "vTableReplicatedValues.bothAttrs")
 
     println("\n\ntriplets ----------------------------------------")
     traverseLineage(triplets, "  ", visited)
@@ -232,7 +238,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
     // Construct the Vid2Pid map. Here we assume that the filter operation
     // behaves deterministically.
     // @todo reindex the vertex and edge tables
-    val newVid2Pid = createVid2Pid(newETable, newVTable.index)
+    val newVid2Pid = new Vid2Pid(newETable, newVTable.index)
     val newVidMap = createLocalVidMap(newETable)
 
     new GraphImpl(newVTable, newVid2Pid, localVidMap, newETable)
@@ -328,7 +334,7 @@ object GraphImpl {
      *
      */
     val etable = createETable(edges)
-    val vid2pid = createVid2Pid(etable, vtable.index)
+    val vid2pid = new Vid2Pid(etable, vtable.index)
     val localVidMap = createLocalVidMap(etable)
     new GraphImpl(vtable, vid2pid, localVidMap, etable)
   }
@@ -367,24 +373,9 @@ object GraphImpl {
     }, preservesPartitioning = true).cache()
   }
 
-  protected def createVid2Pid[ED: ClassManifest](
-    eTable: RDD[(Pid, EdgePartition[ED])],
-    vTableIndex: VertexSetIndex): VertexSetRDD[Array[Pid]] = {
-    val preAgg = eTable.mapPartitions { iter =>
-      val (pid, edgePartition) = iter.next()
-      val vSet = new VertexSet
-      edgePartition.foreach(e => {vSet.add(e.srcId); vSet.add(e.dstId)})
-      vSet.iterator.map { vid => (vid.toLong, pid) }
-    }.partitionBy(vTableIndex.rdd.partitioner.get)
-    VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
-      (p: Pid) => ArrayBuffer(p),
-      (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
-      (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
-      .mapValues(a => a.toArray).cache()
-  }
-
-  protected def createLocalVidMap[ED: ClassManifest](eTable: RDD[(Pid, EdgePartition[ED])]):
-    RDD[(Pid, VertexIdToIndexMap)] = {
+  private def createLocalVidMap(
+      eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED }
+    ): RDD[(Pid, VertexIdToIndexMap)] = {
     eTable.mapPartitions( _.map{ case (pid, epart) =>
       val vidToIndex = new VertexIdToIndexMap
       epart.foreach{ e =>
@@ -395,36 +386,6 @@ object GraphImpl {
     }, preservesPartitioning = true).cache()
   }
 
-  protected def createVTableReplicated[VD: ClassManifest](
-      vTable: VertexSetRDD[VD],
-      vid2pid: VertexSetRDD[Array[Pid]],
-      replicationMap: RDD[(Pid, VertexIdToIndexMap)]):
-    RDD[(Pid, Array[VD])] = {
-    // Join vid2pid and vTable, generate a shuffle dependency on the joined
-    // result, and get the shuffle id so we can use it on the slave.
-    val msgsByPartition = vTable.zipJoinFlatMap(vid2pid) { (vid, vdata, pids) =>
-      // TODO(rxin): reuse VertexBroadcastMessage
-      pids.iterator.map { pid =>
-        new VertexBroadcastMsg[VD](pid, vid, vdata)
-      }
-    }.partitionBy(replicationMap.partitioner.get).cache()
-
-    replicationMap.zipPartitions(msgsByPartition){
-      (mapIter, msgsIter) =>
-      val (pid, vidToIndex) = mapIter.next()
-      assert(!mapIter.hasNext)
-      // Populate the vertex array using the vidToIndex map
-      val vertexArray = new Array[VD](vidToIndex.capacity)
-      for (msg <- msgsIter) {
-        val ind = vidToIndex.getPos(msg.vid) & OpenHashSet.POSITION_MASK
-        vertexArray(ind) = msg.data
-      }
-      Iterator((pid, vertexArray))
-    }.cache()
-
-    // @todo assert edge table has partitioner
-  }
-
   def makeTriplets[VD: ClassManifest, ED: ClassManifest](
     localVidMap: RDD[(Pid, VertexIdToIndexMap)],
     vTableReplicatedValues: RDD[(Pid, Array[VD]) ],
@@ -441,7 +402,7 @@ object GraphImpl {
   def mapTriplets[VD: ClassManifest, ED: ClassManifest, ED2: ClassManifest](
     g: GraphImpl[VD, ED],
     f: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
-    val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
+    val newETable = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues.bothAttrs){
       (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
       val (pid, edgePartition) = edgePartitionIter.next()
       val (_, vidToIndex) = vidToIndexIter.next()
@@ -467,8 +428,16 @@ object GraphImpl {
     ClosureCleaner.clean(mapFunc)
     ClosureCleaner.clean(reduceFunc)
 
+    // For each vertex, replicate its attribute only to partitions where it is
+    // in the relevant position in an edge.
+    val mapFuncUsesSrcAttr = accessesVertexAttr[VD, ED](mapFunc, "srcAttr")
+    val mapFuncUsesDstAttr = accessesVertexAttr[VD, ED](mapFunc, "dstAttr")
+
     // Map and preaggregate
-    val preAgg = g.eTable.zipPartitions(g.localVidMap, g.vTableReplicatedValues){
+    val preAgg = g.eTable.zipPartitions(
+      g.localVidMap,
+      g.vTableReplicatedValues.get(mapFuncUsesSrcAttr, mapFuncUsesDstAttr)
+    ){
       (edgePartitionIter, vidToIndexIter, vertexArrayIter) =>
       val (_, edgePartition) = edgePartitionIter.next()
       val (_, vidToIndex) = vidToIndexIter.next()
@@ -487,8 +456,12 @@ object GraphImpl {
 
       edgePartition.foreach { e =>
         et.set(e)
-        et.srcAttr = vmap(e.srcId)
-        et.dstAttr = vmap(e.dstId)
+        if (mapFuncUsesSrcAttr) {
+          et.srcAttr = vmap(e.srcId)
+        }
+        if (mapFuncUsesDstAttr) {
+          et.dstAttr = vmap(e.dstId)
+        }
         // TODO(rxin): rewrite the foreach using a simple while loop to speed things up.
         // Also given we are only allowing zero, one, or two messages, we can completely unroll
         // the for loop.
@@ -591,4 +564,13 @@ object GraphImpl {
     math.abs((lower, higher).hashCode()) % numParts
   }
 
+  private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](
+      closure: AnyRef, attrName: String): Boolean = {
+    try {
+      BytecodeUtils.invokedMethod(closure, classOf[EdgeTriplet[VD, ED]], attrName)
+    } catch {
+      case _: ClassNotFoundException => true // if we don't know, be conservative
+    }
+  }
+
 } // end of object GraphImpl
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicatedValues.scala b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicatedValues.scala
new file mode 100644
index 0000000000000000000000000000000000000000..fee2d40ee4aa16ded3c1af437c14b377a60dd8a4
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/VTableReplicatedValues.scala
@@ -0,0 +1,84 @@
+package org.apache.spark.graph.impl
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.collection.{OpenHashSet, PrimitiveKeyOpenHashMap}
+
+import org.apache.spark.graph._
+import org.apache.spark.graph.impl.MsgRDDFunctions._
+
+/**
+ * Stores the vertex attribute values after they are replicated. See
+ * the description of localVidMap in [[GraphImpl]].
+ */
+class VTableReplicatedValues[VD: ClassManifest](
+    vTable: VertexSetRDD[VD],
+    vid2pid: Vid2Pid,
+    localVidMap: RDD[(Pid, VertexIdToIndexMap)]) {
+
+  val bothAttrs: RDD[(Pid, Array[VD])] =
+    VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, true)
+  val srcAttrOnly: RDD[(Pid, Array[VD])] =
+    VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, true, false)
+  val dstAttrOnly: RDD[(Pid, Array[VD])] =
+    VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, true)
+  val noAttrs: RDD[(Pid, Array[VD])] =
+    VTableReplicatedValues.createVTableReplicated(vTable, vid2pid, localVidMap, false, false)
+
+
+  def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[(Pid, Array[VD])] =
+    (includeSrcAttr, includeDstAttr) match {
+      case (true, true) => bothAttrs
+      case (true, false) => srcAttrOnly
+      case (false, true) => dstAttrOnly
+      case (false, false) => noAttrs
+    }
+}
+
+class VertexAttributeBlock[VD: ClassManifest](val vids: Array[Vid], val attrs: Array[VD])
+
+object VTableReplicatedValues {
+  protected def createVTableReplicated[VD: ClassManifest](
+      vTable: VertexSetRDD[VD],
+      vid2pid: Vid2Pid,
+      localVidMap: RDD[(Pid, VertexIdToIndexMap)],
+      includeSrcAttr: Boolean,
+      includeDstAttr: Boolean): RDD[(Pid, Array[VD])] = {
+
+    val pid2vid = vid2pid.getPid2Vid(includeSrcAttr, includeDstAttr)
+
+    val msgsByPartition = pid2vid.zipPartitions(vTable.index.rdd, vTable.valuesRDD) {
+      (pid2vidIter, indexIter, valuesIter) =>
+      val pid2vid = pid2vidIter.next()
+      val index = indexIter.next()
+      val values = valuesIter.next()
+      val vmap = new PrimitiveKeyOpenHashMap(index, values._1)
+
+      // Send each partition the vertex attributes it wants
+      val output = new Array[(Pid, VertexAttributeBlock[VD])](pid2vid.size)
+      for (pid <- 0 until pid2vid.size) {
+        val block = new VertexAttributeBlock(pid2vid(pid), pid2vid(pid).map(vid => vmap(vid)))
+        output(pid) = (pid, block)
+      }
+      output.iterator
+    }.partitionBy(localVidMap.partitioner.get).cache()
+
+    localVidMap.zipPartitions(msgsByPartition){
+      (mapIter, msgsIter) =>
+      val (pid, vidToIndex) = mapIter.next()
+      assert(!mapIter.hasNext)
+      // Populate the vertex array using the vidToIndex map
+      val vertexArray = new Array[VD](vidToIndex.capacity)
+      for ((_, block) <- msgsIter) {
+        for (i <- 0 until block.vids.size) {
+          val vid = block.vids(i)
+          val attr = block.attrs(i)
+          val ind = vidToIndex.getPos(vid) & OpenHashSet.POSITION_MASK
+          vertexArray(ind) = attr
+        }
+      }
+      Iterator((pid, vertexArray))
+    }.cache()
+  }
+
+}
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Vid2Pid.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Vid2Pid.scala
new file mode 100644
index 0000000000000000000000000000000000000000..363adbbce949e2ed66005bc2b8c2115eee2bd81a
--- /dev/null
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/Vid2Pid.scala
@@ -0,0 +1,87 @@
+package org.apache.spark.graph.impl
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ArrayBuilder
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
+
+import org.apache.spark.graph._
+
+/**
+ * Stores the layout of vertex attributes for GraphImpl.
+ */
+class Vid2Pid(
+    eTable: RDD[(Pid, EdgePartition[ED])] forSome { type ED },
+    vTableIndex: VertexSetIndex) {
+
+  val bothAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(true, true)
+  val srcAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(true, false)
+  val dstAttrOnly: VertexSetRDD[Array[Pid]] = createVid2Pid(false, true)
+  val noAttrs: VertexSetRDD[Array[Pid]] = createVid2Pid(false, false)
+
+  val pid2VidBothAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(bothAttrs)
+  val pid2VidSrcAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(srcAttrOnly)
+  val pid2VidDstAttrOnly: RDD[Array[Array[Vid]]] = createPid2Vid(dstAttrOnly)
+  val pid2VidNoAttrs: RDD[Array[Array[Vid]]] = createPid2Vid(noAttrs)
+
+  def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] =
+    (includeSrcAttr, includeDstAttr) match {
+      case (true, true) => bothAttrs
+      case (true, false) => srcAttrOnly
+      case (false, true) => dstAttrOnly
+      case (false, false) => noAttrs
+    }
+
+  def getPid2Vid(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[Vid]]] =
+    (includeSrcAttr, includeDstAttr) match {
+      case (true, true) => pid2VidBothAttrs
+      case (true, false) => pid2VidSrcAttrOnly
+      case (false, true) => pid2VidDstAttrOnly
+      case (false, false) => pid2VidNoAttrs
+    }
+
+  def persist(newLevel: StorageLevel) {
+    bothAttrs.persist(newLevel)
+    srcAttrOnly.persist(newLevel)
+    dstAttrOnly.persist(newLevel)
+    noAttrs.persist(newLevel)
+  }
+
+  private def createVid2Pid(
+      includeSrcAttr: Boolean,
+      includeDstAttr: Boolean): VertexSetRDD[Array[Pid]] = {
+    val preAgg = eTable.mapPartitions { iter =>
+      val (pid, edgePartition) = iter.next()
+      val vSet = new VertexSet
+      if (includeSrcAttr || includeDstAttr) {
+        edgePartition.foreach(e => {
+          if (includeSrcAttr) vSet.add(e.srcId)
+          if (includeDstAttr) vSet.add(e.dstId)
+        })
+      }
+      vSet.iterator.map { vid => (vid.toLong, pid) }
+    }
+    VertexSetRDD[Pid, ArrayBuffer[Pid]](preAgg, vTableIndex,
+      (p: Pid) => ArrayBuffer(p),
+      (ab: ArrayBuffer[Pid], p:Pid) => {ab.append(p); ab},
+      (a: ArrayBuffer[Pid], b: ArrayBuffer[Pid]) => a ++ b)
+      .mapValues(a => a.toArray).cache()
+  }
+
+  /**
+   * Creates an intermediate pid2vid structure that tells each partition of the
+   * vertex data where it should go.
+   */
+  private def createPid2Vid(vid2pid: VertexSetRDD[Array[Pid]]): RDD[Array[Array[Vid]]] = {
+    val numPartitions = vid2pid.partitions.size
+    vid2pid.mapPartitions { iter =>
+      val pid2vidLocal = Array.fill[ArrayBuilder[Vid]](numPartitions)(ArrayBuilder.make[Vid])
+      for ((vid, pids) <- iter) {
+        pids.foreach { pid => pid2vidLocal(pid) += vid }
+      }
+      Iterator(pid2vidLocal.map(_.result))
+    }
+  }
+}
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index ec548bda1606924e43c6d0c31f97323b4c6b07d2..9c22608554671e8b6f98bafbd59219074a1539fc 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -33,6 +33,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("mapReduceTriplets") {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val n = 3
+      val star = Graph(sc.parallelize((1 to n).map(x => (0: Vid, x: Vid))))
+
+      val neighborDegreeSums = star.mapReduceTriplets(
+        edge => Array((edge.srcId, edge.dstAttr), (edge.dstId, edge.srcAttr)),
+        (a: Int, b: Int) => a + b)
+      assert(neighborDegreeSums.collect().toSet === (0 to n).map(x => (x, n)).toSet)
+    }
+  }
+
   test("aggregateNeighbors") {
     withSpark(new SparkContext("local", "test")) { sc =>
       val n = 3
@@ -87,6 +99,6 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)
 
     }
-  } 
-  
+  }
+
 }