From 45f4c66122c57011e74c694a424756812ab77d99 Mon Sep 17 00:00:00 2001
From: Brennon York <brennon.york@capitalone.com>
Date: Mon, 16 Mar 2015 01:06:26 -0700
Subject: [PATCH] [SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in
 VertexRDD

Changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]. This change maintains backwards compatibility and better unifies the VertexRDD methods to match each other.

Author: Brennon York <brennon.york@capitalone.com>

Closes #4733 from brennonyork/SPARK-5922 and squashes the following commits:

e800f08 [Brennon York] fixed merge conflicts
b9274af [Brennon York] fixed merge conflicts
f86375c [Brennon York] fixed minor include line
398ddb4 [Brennon York] fixed merge conflicts
aac1810 [Brennon York] updated to aggregateUsingIndex and added test to ensure that method works properly
2af0b88 [Brennon York] removed deprecation line
753c963 [Brennon York] fixed merge conflicts and set preference to use the diff(other: VertexRDD[VD]) method
2c678c6 [Brennon York] added mima exclude to exclude new public diff method from VertexRDD
93186f3 [Brennon York] added back the original diff method to sustain binary compatibility
f18356e [Brennon York] changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]
---
 .../scala/org/apache/spark/graphx/VertexRDD.scala   |  9 +++++++++
 .../apache/spark/graphx/impl/VertexRDDImpl.scala    |  4 ++++
 .../org/apache/spark/graphx/VertexRDDSuite.scala    | 13 +++++++++++++
 project/MimaExcludes.scala                          |  3 +++
 4 files changed, 29 insertions(+)

diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 40ecff7107..ad4bfe0772 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -121,6 +121,15 @@ abstract class VertexRDD[VD](
    */
   def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
 
+  /**
+   * For each vertex present in both `this` and `other`, `diff` returns only those vertices with
+   * differing values; for values that are different, keeps the values from `other`. This is
+   * only guaranteed to work if the VertexRDDs share a common ancestor.
+   *
+   * @param other the other RDD[(VertexId, VD)] with which to diff against.
+   */
+  def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
+
   /**
    * For each vertex present in both `this` and `other`, `diff` returns only those vertices with
    * differing values; for values that are different, keeps the values from `other`. This is
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 904be21314..125692ddaa 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] (
   override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] =
     this.mapVertexPartitions(_.map(f))
 
+  override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+    diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
+  }
+
   override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
     val otherPartition = other match {
       case other: VertexRDD[_] if this.partitioner == other.partitioner =>
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 97533dd3aa..4f7a442ab5 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
 import org.scalatest.FunSuite
 
 import org.apache.spark.{HashPartitioner, SparkContext}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
 class VertexRDDSuite extends FunSuite with LocalSparkContext {
@@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("diff with RDD[(VertexId, VD)]") {
+    withSpark { sc =>
+      val n = 100
+      val verts = vertices(sc, n).cache()
+      val flipEvens: RDD[(VertexId, Int)] =
+        sc.parallelize(0L to 100L)
+          .map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache()
+      // diff should keep only the changed vertices
+      assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
+    }
+  }
+
   test("diff vertices with the non-equal number of partitions") {
     withSpark { sc =>
       val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 627b2cea4d..a6b07fa7cd 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -181,6 +181,9 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
+          ) ++ Seq(
+            // SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD
+            ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")
           )
 
         case v if v.startsWith("1.2") =>
-- 
GitLab