From 8d8056da14d3a7eac39f4cf14970467729350018 Mon Sep 17 00:00:00 2001
From: Dan Crankshaw <dscrankshaw@gmail.com>
Date: Wed, 30 Oct 2013 15:03:21 -0700
Subject: [PATCH] Fixed issue with canonical edge partitioner.

---
 .../scala/org/apache/spark/graph/Graph.scala  |  1 +
 .../apache/spark/graph/impl/GraphImpl.scala   | 23 ++++++++-----------
 2 files changed, 10 insertions(+), 14 deletions(-)

diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
index f5b4c57f72..ddc8cbf4fb 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala
@@ -1,6 +1,7 @@
 package org.apache.spark.graph
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.util.ClosureCleaner
 import org.apache.spark.storage.StorageLevel
 
 /**
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
index d185f9f8cd..693bb888bc 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala
@@ -253,8 +253,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected (
         .toList
         // groups all ETs in this partition that have the same src and dst
         // Because all ETs with the same src and dst will live on the same
-        // partition due to the EdgePartitioner, this guarantees that these
-        // ET groups will be complete.
+        // partition due to the canonicalRandomVertexCut partitioner, this
+        // guarantees that these ET groups will be complete.
         .groupBy { t: EdgeTriplet[VD, ED] =>  (t.srcId, t.dstId) }
         .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) }
         .toList
@@ -357,7 +357,6 @@ object GraphImpl {
       // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions)
       // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
       val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions)
-      //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt)
 
       // Should we be using 3-tuple or an optimized class
       new MessageToPartition(part, (e.srcId, e.dstId, e.attr))
@@ -555,18 +554,14 @@ object GraphImpl {
   }
 
   /**
-   * @todo This will only partition edges to the upper diagonal
-   * of the 2D processor space.
+   * Assign edges to an arbitrary machine corresponding to a random vertex cut. This
+   * function ensures that edges of opposite direction between the same two vertices
+   * will end up on the same partition.
    */
-  protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid,
-    numParts: Pid, ceilSqrtNumParts: Pid): Pid = {
-    val mixingPrime: Vid = 1125899906842597L
-    // Partitions by canonical edge direction
-    val src = math.min(srcOrig, dstOrig)
-    val dst = math.max(srcOrig, dstOrig)
-    val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
-    val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
-    (col * ceilSqrtNumParts + row) % numParts
+  protected def canonicalRandomVertexCut(src: Vid, dst: Vid, numParts: Pid): Pid = {
+    val lower = math.min(src, dst)
+    val higher = math.max(src, dst)
+    math.abs((lower, higher).hashCode()) % numParts
   }
 
   private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](
-- 
GitLab