diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index f5b4c57f72902167d0560ed98a1cafbd72b2a439..ddc8cbf4fbde6c8e56cafc3b95ef11bcd7fecefd 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -1,6 +1,7 @@ package org.apache.spark.graph import org.apache.spark.rdd.RDD +import org.apache.spark.util.ClosureCleaner import org.apache.spark.storage.StorageLevel /** diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index d185f9f8cd7baa94fb130021daa8392006af6555..693bb888bca265a9b73720c63968cd26e542d6e0 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -253,8 +253,8 @@ class GraphImpl[VD: ClassManifest, ED: ClassManifest] protected ( .toList // groups all ETs in this partition that have the same src and dst // Because all ETs with the same src and dst will live on the same - // partition due to the EdgePartitioner, this guarantees that these - // ET groups will be complete. + // partition due to the canonicalRandomVertexCut partitioner, this + // guarantees that these ET groups will be complete. .groupBy { t: EdgeTriplet[VD, ED] => (t.srcId, t.dstId) } .mapValues { ts: List[EdgeTriplet[VD, ED]] => f(ts.toIterator) } .toList @@ -357,7 +357,6 @@ object GraphImpl { // val part: Pid = edgePartitionFunction1D(e.srcId, e.dstId, numPartitions) // val part: Pid = edgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) val part: Pid = randomVertexCut(e.srcId, e.dstId, numPartitions) - //val part: Pid = canonicalEdgePartitionFunction2D(e.srcId, e.dstId, numPartitions, ceilSqrt) // Should we be using 3-tuple or an optimized class new MessageToPartition(part, (e.srcId, e.dstId, e.attr)) @@ -555,18 +554,14 @@ object GraphImpl { } /** - * @todo This will only partition edges to the upper diagonal - * of the 2D processor space. + * Assign edges to an arbitrary machine corresponding to a random vertex cut. This + * function ensures that edges of opposite direction between the same two vertices + * will end up on the same partition. */ - protected def canonicalEdgePartitionFunction2D(srcOrig: Vid, dstOrig: Vid, - numParts: Pid, ceilSqrtNumParts: Pid): Pid = { - val mixingPrime: Vid = 1125899906842597L - // Partitions by canonical edge direction - val src = math.min(srcOrig, dstOrig) - val dst = math.max(srcOrig, dstOrig) - val col: Pid = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt - val row: Pid = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt - (col * ceilSqrtNumParts + row) % numParts + protected def canonicalRandomVertexCut(src: Vid, dst: Vid, numParts: Pid): Pid = { + val lower = math.min(src, dst) + val higher = math.max(src, dst) + math.abs((lower, higher).hashCode()) % numParts } private def accessesVertexAttr[VD: ClassManifest, ED: ClassManifest](