From 2dc9ec23874eb7174183134bb2ae6050b4ff270d Mon Sep 17 00:00:00 2001
From: "Joseph E. Gonzalez" <joseph.e.gonzalez@gmail.com>
Date: Tue, 5 Nov 2013 01:15:12 -0800
Subject: [PATCH] Reverting to Array based (materialized) output of all
 VertexSetRDD operations.

---
 .../org/apache/spark/graph/GraphOps.scala     |  11 +-
 .../org/apache/spark/graph/VertexSetRDD.scala | 178 +++++++++---------
 .../apache/spark/graph/impl/GraphImpl.scala   |  11 +-
 .../apache/spark/graph/AnalyticsSuite.scala   |  36 ++--
 .../org/apache/spark/graph/GraphSuite.scala   |   6 +-
 5 files changed, 121 insertions(+), 121 deletions(-)

diff --git a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
index 8480ff29d3..8c7f4c25e2 100644
--- a/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/GraphOps.scala
@@ -154,11 +154,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
       (vid, edge) => Some(Array(edge.otherVertexId(vid))),
       (a, b) => a ++ b,
       edgeDirection)
-
-    graph.vertices.leftZipJoin(nbrs).mapValues{
-      case (_, Some(nbrs)) => nbrs
-      case (_, None) => Array.empty[Vid]
-    }
+    graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[Vid]) }
   } // end of collectNeighborIds
 
 
@@ -183,10 +179,7 @@ class GraphOps[VD: ClassManifest, ED: ClassManifest](graph: Graph[VD, ED]) {
       (a, b) => a ++ b,
       edgeDirection)
 
-    graph.vertices.leftZipJoin(nbrs).mapValues{
-      case (_, Some(nbrs)) => nbrs
-      case (_, None) => Array.empty[(Vid, VD)]
-    }
+    graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[(Vid, VD)]) }
   } // end of collectNeighbor
 
 
diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
index f26e286003..5cb05998aa 100644
--- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala
@@ -98,7 +98,7 @@ class VertexSetIndex(private[spark] val rdd: RDD[VertexIdToIndexMap]) {
  */
 class VertexSetRDD[@specialized V: ClassManifest](
     @transient val index:  VertexSetIndex,
-    @transient val valuesRDD: RDD[ ( (Int => V), BitSet) ])
+    @transient val valuesRDD: RDD[ ( Array[V], BitSet) ])
   extends RDD[(Vid, V)](index.rdd.context, 
     List(new OneToOneDependency(index.rdd), new OneToOneDependency(valuesRDD)) ) {
 
@@ -182,7 +182,7 @@ class VertexSetRDD[@specialized V: ClassManifest](
     val cleanPred = index.rdd.context.clean(pred)
     val newValues = index.rdd.zipPartitions(valuesRDD){ 
       (keysIter: Iterator[VertexIdToIndexMap], 
-       valuesIter: Iterator[(Int => V, BitSet)]) => 
+       valuesIter: Iterator[(Array[V], BitSet)]) =>
       val index = keysIter.next()
       assert(keysIter.hasNext() == false)
       val (oldValues, bs) = valuesIter.next()
@@ -217,11 +217,12 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * VertexSetRDD retains the same index.
    */
   def mapValues[U: ClassManifest](f: V => U): VertexSetRDD[U] = {
-    val newValuesRDD: RDD[ (Int => U, BitSet) ] = 
+    val cleanF = index.rdd.context.clean(f)
+    val newValuesRDD: RDD[ (Array[U], BitSet) ] =
       valuesRDD.mapPartitions(iter => iter.map{ 
-        case (values, bs: BitSet) => 
-          val newValues: (Int => U) = 
-            (ind: Int) => if (bs.get(ind)) f(values(ind)) else null.asInstanceOf[U]
+        case (values, bs: BitSet) =>
+          val newValues = new Array[U](values.size)
+          bs.iterator.foreach { ind => newValues(ind) = cleanF(values(ind)) }
           (newValues, bs)
       }, preservesPartitioning = true)   
     new VertexSetRDD[U](index, newValuesRDD)
@@ -241,19 +242,18 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * VertexSetRDD retains the same index.
    */
   def mapValuesWithKeys[U: ClassManifest](f: (Vid, V) => U): VertexSetRDD[U] = {
-    val newValues: RDD[ (Int => U, BitSet) ] = 
+    val cleanF = index.rdd.context.clean(f)
+    val newValues: RDD[ (Array[U], BitSet) ] =
       index.rdd.zipPartitions(valuesRDD){ 
         (keysIter: Iterator[VertexIdToIndexMap], 
-         valuesIter: Iterator[(Int => V, BitSet)]) => 
+         valuesIter: Iterator[(Array[V], BitSet)]) =>
         val index = keysIter.next()
         assert(keysIter.hasNext() == false)
-        val (oldValues, bs: BitSet) = valuesIter.next()
+        val (values, bs: BitSet) = valuesIter.next()
         assert(valuesIter.hasNext() == false)
         // Cosntruct a view of the map transformation
-        val newValues: (Int => U) = (ind: Int) => {
-          if (bs.get(ind)) { f(index.getValueSafe(ind), oldValues(ind)) }
-          else { null.asInstanceOf[U] }
-        }
+        val newValues = new Array[U](index.capacity)
+        bs.iterator.foreach { ind => newValues(ind) = cleanF(index.getValueSafe(ind), values(ind)) }
         Iterator((newValues, bs))
       }
     new VertexSetRDD[U](index, newValues)
@@ -261,6 +261,8 @@ class VertexSetRDD[@specialized V: ClassManifest](
 
 
   /**
+   * @todo update docs to reflect function argument
+   *
    * Inner join this VertexSet with another VertexSet which has the
    * same Index.  This function will fail if both VertexSets do not
    * share the same index.  The resulting vertex set will only contain
@@ -273,20 +275,25 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * and the other VertexSet and with tuple attributes.
    *
    */
-  def zipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,W)] = {
+  def zipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, W) => Z):
+    VertexSetRDD[Z] = {
+    val cleanF = index.rdd.context.clean(f)
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
     }
-    val newValuesRDD: RDD[ (Int => (V,W), BitSet) ] = 
-      valuesRDD.zipPartitions(other.valuesRDD){
-        (thisIter: Iterator[(Int => V, BitSet)], 
-          otherIter: Iterator[(Int => W, BitSet)]) => 
+    val newValuesRDD: RDD[ (Array[Z], BitSet) ] =
+      index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) =>
+        val index = indexIter.next()
+        assert(!indexIter.hasNext)
         val (thisValues, thisBS: BitSet) = thisIter.next()
         assert(!thisIter.hasNext)
         val (otherValues, otherBS: BitSet) = otherIter.next()
         assert(!otherIter.hasNext)
         val newBS: BitSet = thisBS & otherBS
-        val newValues: Int => (V,W) = (ind: Int) => (thisValues(ind), otherValues(ind)) 
+        val newValues = new Array[Z](index.capacity)
+        newBS.iterator.foreach { ind =>
+          newValues(ind) = cleanF(index.getValueSafe(ind), thisValues(ind), otherValues(ind))
+        }
         Iterator((newValues, newBS))
       }
     new VertexSetRDD(index, newValuesRDD)
@@ -294,6 +301,37 @@ class VertexSetRDD[@specialized V: ClassManifest](
 
 
   /**
+   * @todo document
+   *
+   * @param other
+   * @param f
+   * @tparam W
+   * @tparam Z
+   * @return
+   */
+  def zipJoinFlatMap[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V,W) => Iterator[Z]):
+  RDD[Z] = {
+    val cleanF = index.rdd.context.clean(f)
+    if(index != other.index) {
+      throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
+    }
+    index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) =>
+      val index = indexIter.next()
+      assert(!indexIter.hasNext)
+      val (thisValues, thisBS: BitSet) = thisIter.next()
+      assert(!thisIter.hasNext)
+      val (otherValues, otherBS: BitSet) = otherIter.next()
+      assert(!otherIter.hasNext)
+      val newBS: BitSet = thisBS & otherBS
+      val newValues = new Array[Z](index.capacity)
+      newBS.iterator.flatMap { ind => cleanF(index.getValueSafe(ind), thisValues(ind), otherValues(ind)) }
+    }
+  }
+
+
+  /**
+   * @todo update docs to reflect function argument
+
    * Left join this VertexSet with another VertexSet which has the
    * same Index.  This function will fail if both VertexSets do not
    * share the same index.  The resulting vertex set contains an entry
@@ -308,20 +346,25 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * other VertexSet.
    *
    */
-  def leftZipJoin[W: ClassManifest](other: VertexSetRDD[W]): VertexSetRDD[(V,Option[W])] = {
+  def leftZipJoin[W: ClassManifest, Z: ClassManifest](other: VertexSetRDD[W])(f: (Vid, V, Option[W]) => Z):
+    VertexSetRDD[Z] = {
     if(index != other.index) {
       throw new SparkException("A zipJoin can only be applied to RDDs with the same index!")
     }
-    val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = 
-      valuesRDD.zipPartitions(other.valuesRDD){
-      (thisIter: Iterator[(Int => V, BitSet)], 
-        otherIter: Iterator[(Int => W, BitSet)]) => 
+    val cleanF = index.rdd.context.clean(f)
+    val newValuesRDD: RDD[(Array[Z], BitSet)] =
+      index.rdd.zipPartitions(valuesRDD, other.valuesRDD) { (indexIter, thisIter, otherIter) =>
+      val index = indexIter.next()
+      assert(!indexIter.hasNext)
       val (thisValues, thisBS: BitSet) = thisIter.next()
       assert(!thisIter.hasNext)
       val (otherValues, otherBS: BitSet) = otherIter.next()
       assert(!otherIter.hasNext)
-      val newValues: Int => (V, Option[W]) = (ind: Int) => 
-        (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
+      val newValues = new Array[Z](index.capacity)
+      thisBS.iterator.foreach { ind =>
+        val otherV = if (otherBS.get(ind)) Option(otherValues(ind)) else None
+        newValues(ind) = cleanF(index.getValueSafe(ind), thisValues(ind), otherV)
+      }
       Iterator((newValues, thisBS))
     }
     new VertexSetRDD(index, newValuesRDD)
@@ -346,68 +389,29 @@ class VertexSetRDD[@specialized V: ClassManifest](
    * other VertexSet.
    *
    */
-  def leftJoin[W: ClassManifest](
-    other: RDD[(Vid,W)], merge: (W,W) => W = (a:W, b:W) => a):
-    VertexSetRDD[(V, Option[W]) ] = {
+  def leftJoin[W: ClassManifest, Z: ClassManifest](other: RDD[(Vid,W)])
+    (f: (Vid, V, Option[W]) => Z, merge: (W,W) => W = (a:W, b:W) => a ):
+    VertexSetRDD[Z] = {
+    val cleanF = index.rdd.context.clean(f)
+    val cleanMerge = index.rdd.context.clean(merge)
     // Test if the other vertex is a VertexSetRDD to choose the optimal
     // join strategy
     other match {
       // If the other set is a VertexSetRDD and shares the same index as
       // this vertex set then we use the much more efficient leftZipJoin
       case other: VertexSetRDD[_] if index == other.index => {
-        leftZipJoin(other)
-      }    
+        leftZipJoin(other)(cleanF)
+        // @todo handle case where other is a VertexSetRDD with a different index
+      }
       case _ => {
-        // Otherwise we treat the other RDD as a collectiong of 
-        // vertex-attribute pairs.  
-        // If necessary shuffle the other RDD using the partitioner 
-        // for this VertexSet
-        val otherShuffled = 
-          if (other.partitioner == partitioner) other 
-          else other.partitionBy(partitioner.get)
-        // Compute the new values RDD
-        val newValuesRDD: RDD[ (Int => (V,Option[W]), BitSet) ] = 
-          index.rdd.zipPartitions(valuesRDD, otherShuffled) {
-          (thisIndexIter: Iterator[VertexIdToIndexMap], 
-            thisIter: Iterator[(Int => V, BitSet)], 
-            tuplesIter: Iterator[(Vid,W)]) =>
-          // Get the Index and values for this RDD
-          val index = thisIndexIter.next()
-          assert(!thisIndexIter.hasNext)
-          val (thisValues, thisBS) = thisIter.next()
-          assert(!thisIter.hasNext)
-          // Create a new array to store the values in the resulting VertexSet
-          val otherValues = new Array[W](index.capacity)
-          // track which values are matched with values in other
-          val otherBS = new BitSet(index.capacity)
-          for ((k,w) <- tuplesIter) {
-            // Get the location of the key in the index
-            val pos = index.getPos(k)
-            // Only if the key is already in the index
-            if ((pos & OpenHashSet.EXISTENCE_MASK) == 0) {
-              // Get the actual index
-              val ind = pos & OpenHashSet.POSITION_MASK
-              // If this value has already been seen then merge
-              if (otherBS.get(ind)) {
-                otherValues(ind) = merge(otherValues(ind), w)
-              } else { // otherwise just store the new value
-                otherBS.set(ind)
-                otherValues(ind) = w
-              }
-            }
-          }
-          // Some vertices in this vertex set may not have a corresponding
-          // tuple in the join and so a None value should be returned. 
-          val newValues: Int => (V, Option[W]) = (ind: Int) => 
-            (thisValues(ind), if (otherBS.get(ind)) Option(otherValues(ind)) else None)
-          Iterator((newValues, thisBS))
-        } // end of newValues
-        new VertexSetRDD(index, newValuesRDD) 
+        val indexedOther: VertexSetRDD[W] = VertexSetRDD(other, index, cleanMerge)
+        leftZipJoin(indexedOther)(cleanF)
       }
     }
   } // end of leftJoin
 
 
+
   /**
    * For each key k in `this` or `other`, return a resulting RDD that contains a 
    * tuple with the list of values for that key in `this` as well as `other`.
@@ -609,29 +613,30 @@ object VertexSetRDD {
    */
   def apply[V: ClassManifest](
     rdd: RDD[(Vid,V)], reduceFunc: (V, V) => V): VertexSetRDD[V] = {
+    val cReduceFunc = rdd.context.clean(reduceFunc)
     // Preaggregate and shuffle if necessary
     val preAgg = rdd.partitioner match {
       case Some(p) => rdd
       case None => 
         val partitioner = new HashPartitioner(rdd.partitions.size)
         // Preaggregation.
-        val aggregator = new Aggregator[Vid, V, V](v => v, reduceFunc, reduceFunc)
+        val aggregator = new Aggregator[Vid, V, V](v => v, cReduceFunc, cReduceFunc)
         rdd.mapPartitions(aggregator.combineValuesByKey, true).partitionBy(partitioner)
     } 
 
     val groups = preAgg.mapPartitions( iter => {
       val hashMap = new PrimitiveKeyOpenHashMap[Vid, V]
       for ((k,v) <- iter) {
-        hashMap.setMerge(k, v, reduceFunc)
+        hashMap.setMerge(k, v, cReduceFunc)
       }
       val index = hashMap.keySet
-      val values: Int => V = (ind: Int) => hashMap._values(ind)
+      val values = hashMap._values
       val bs = index.getBitSet
       Iterator( (index, (values, bs)) )
       }, true).cache
     // extract the index and the values
     val index = groups.mapPartitions(_.map{ case (kMap, vAr) => kMap }, true)
-    val values: RDD[(Int => V, BitSet)] = 
+    val values: RDD[(Array[V], BitSet)] =
       groups.mapPartitions(_.map{ case (kMap,vAr) => vAr }, true)
     new VertexSetRDD[V](new VertexSetIndex(index), values)
   } // end of apply
@@ -690,6 +695,9 @@ object VertexSetRDD {
     createCombiner: V => C,
     mergeValue: (C, V) => C,
     mergeCombiners: (C, C) => C): VertexSetRDD[C] = {
+    val cCreateCombiner = index.rdd.context.clean(createCombiner)
+    val cMergeValue = index.rdd.context.clean(mergeValue)
+    val cMergeCombiners = index.rdd.context.clean(mergeCombiners)
     // Get the index Partitioner
     val partitioner = index.rdd.partitioner match {
       case Some(p) => p
@@ -699,15 +707,15 @@ object VertexSetRDD {
     val partitioned = 
       if (rdd.partitioner != Some(partitioner)) {
         // Preaggregation.
-        val aggregator = new Aggregator[Vid, V, C](createCombiner, mergeValue, 
-          mergeCombiners)
+        val aggregator = new Aggregator[Vid, V, C](cCreateCombiner, cMergeValue,
+          cMergeCombiners)
         rdd.mapPartitions(aggregator.combineValuesByKey).partitionBy(partitioner)
       } else {
         rdd.mapValues(x => createCombiner(x))
       }
 
     // Use the index to build the new values table
-    val values: RDD[ (Int => C, BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
+    val values: RDD[ (Array[C], BitSet) ] = index.rdd.zipPartitions(partitioned)( (indexIter, tblIter) => {
       // There is only one map
       val index = indexIter.next()
       assert(!indexIter.hasNext())
@@ -724,14 +732,14 @@ object VertexSetRDD {
           val ind = pos & OpenHashSet.POSITION_MASK
           // If this value has already been seen then merge
           if (bs.get(ind)) {
-            values(ind) = mergeCombiners(values(ind), c)
+            values(ind) = cMergeCombiners(values(ind), c)
           } else { // otherwise just store the new value
             bs.set(ind)
             values(ind) = c
           }
         }
       }
-      Iterator(((ind: Int) => values(ind), bs))
+      Iterator((values, bs))
     })
     new VertexSetRDD(index, values)
   } // end of apply
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index b80713dbf4..83ff2d734c 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -315,8 +315,7 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
     (updates: RDD[(Vid, U)])(updateF: (Vid, VD, Option[U]) => VD2)
     : Graph[VD2, ED] = {
     ClosureCleaner.clean(updateF)
-    val newVTable = vTable.leftJoin(updates).mapValuesWithKeys(
-      (vid, vu) => updateF(vid, vu._1, vu._2) )
+    val newVTable = vTable.leftJoin(updates)(updateF)
     new GraphImpl(newVTable, vid2pid, localVidMap, eTable)
   }
 
@@ -437,11 +436,9 @@ object GraphImpl {
     RDD[(Pid, Array[VD])] = {
     // Join vid2pid and vTable, generate a shuffle dependency on the joined 
     // result, and get the shuffle id so we can use it on the slave.
-    val msgsByPartition = vTable.zipJoin(vid2pid)
-      .flatMap { case (vid, (vdata, pids)) =>
-        pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
-      }
-      .partitionBy(replicationMap.partitioner.get).cache()
+    val msgsByPartition = vTable.zipJoinFlatMap(vid2pid) { (vid, vdata, pids) =>
+      pids.iterator.map { pid => MessageToPartition(pid, (vid, vdata)) }
+    }.partitionBy(replicationMap.partitioner.get).cache()
    
     replicationMap.zipPartitions(msgsByPartition){ 
       (mapIter, msgsIter) =>
diff --git a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
index 8d0b2e0b02..0fb101a08c 100644
--- a/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/AnalyticsSuite.scala
@@ -4,6 +4,7 @@ import org.scalatest.FunSuite
 
 import org.apache.spark.SparkContext
 import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
 
 import org.apache.spark.graph.LocalSparkContext._
 
@@ -58,8 +59,9 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
       val prGraph1 = Analytics.pagerank(starGraph, 1, resetProb)
       val prGraph2 = Analytics.pagerank(starGraph, 2, resetProb)
     
-      val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices)
-        .map{ case (vid, (pr1, pr2)) => if (pr1 != pr2) { 1 } else { 0 } }.sum
+      val notMatching = prGraph1.vertices.zipJoin(prGraph2.vertices) { (vid, pr1, pr2) =>
+        if (pr1 != pr2) { 1 } else { 0 }
+      }.map { case (vid, test) => test }.sum
       assert(notMatching === 0)
       prGraph2.vertices.foreach(println(_))
       val errors = prGraph2.vertices.map{ case (vid, pr) =>
@@ -70,10 +72,12 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
       assert(errors.sum === 0)
 
       val prGraph3 = Analytics.deltaPagerank(starGraph, 0, resetProb)
-      val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices).map{
-        case (_, (pr1, Some(pr2))) if(pr1 == pr2) => 0
-        case _ => 1
-      }.sum
+      val errors2 = prGraph2.vertices.leftJoin(prGraph3.vertices){ (vid, pr1, pr2Opt) =>
+        pr2Opt match {
+          case Some(pr2) if(pr1 == pr2) => 0
+          case _ => 1
+        }
+      }.map { case (vid, test) => test }.sum
       assert(errors2 === 0)
     }
   } // end of test Star PageRank
@@ -86,19 +90,17 @@ class AnalyticsSuite extends FunSuite with LocalSparkContext {
       val resetProb = 0.15
       val prGraph1 = Analytics.pagerank(gridGraph, 50, resetProb).cache()
       val prGraph2 = Analytics.deltaPagerank(gridGraph, 0.0001, resetProb).cache()
-      val error = prGraph1.vertices.zipJoin(prGraph2.vertices).map {
-        case (id, (a, b)) => (a - b) * (a - b)
-      }.sum
-      prGraph1.vertices.zipJoin(prGraph2.vertices)
-        .map{ case (id, (a,b)) => (id, (a,b, a-b))}.foreach(println(_))
+      val error = prGraph1.vertices.zipJoin(prGraph2.vertices) { case (id, a, b) => (a - b) * (a - b) }
+        .map { case (id, error) => error }.sum
+      prGraph1.vertices.zipJoin(prGraph2.vertices) { (id, a, b) => (a, b, a-b) }.foreach(println(_))
       println(error)
       assert(error < 1.0e-5)
-      val pr3 = sc.parallelize(GridPageRank(10,10, 50, resetProb))
-      val error2 = prGraph1.vertices.leftJoin(pr3).map {
-        case (id, (a, Some(b))) => (a - b) * (a - b)
-        case _ => 0 
-      }.sum
-      prGraph1.vertices.leftJoin(pr3).foreach(println( _ ))
+      val pr3: RDD[(Vid, Double)] = sc.parallelize(GridPageRank(10,10, 50, resetProb))
+      val error2 = prGraph1.vertices.leftJoin(pr3) { (id, a, bOpt) =>
+        val b: Double  = bOpt.get
+        (a - b) * (a - b)
+      }.map { case (id, error) => error }.sum
+      prGraph1.vertices.leftJoin(pr3) { (id, a, b) => (a, b) }.foreach( println(_) )
       println(error2)
       assert(error2 < 1.0e-5)
     }
diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
index 2067b1613e..ec548bda16 100644
--- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
+++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala
@@ -78,13 +78,13 @@ class GraphSuite extends FunSuite with LocalSparkContext {
       val a = sc.parallelize((0 to 100).map(x => (x.toLong, x.toLong)), 5)
       val b = VertexSetRDD(a).mapValues(x => -x)
       assert(b.count === 101)
-      assert(b.leftJoin(a).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
+      assert(b.leftJoin(a){ (id, a, bOpt) => a + bOpt.get }.map(x=> x._2).reduce(_+_) === 0)
       val c = VertexSetRDD(a, b.index)
-      assert(b.leftJoin(c).mapValues(x => x._1 + x._2.get).map(x=> x._2).reduce(_+_) === 0)
+      assert(b.leftJoin(c){ (id, b, cOpt) => b + cOpt.get }.map(x=> x._2).reduce(_+_) === 0)
       val d = c.filter(q => ((q._2 % 2) == 0))
       val e = a.filter(q => ((q._2 % 2) == 0))
       assert(d.count === e.count)
-      assert(b.zipJoin(c).mapValues(x => x._1 + x._2).map(x => x._2).reduce(_+_) === 0)
+      assert(b.zipJoin(c)((id, b, c) => b + c).map(x => x._2).reduce(_+_) === 0)
 
     }
   } 
-- 
GitLab