Skip to content
Snippets Groups Projects
Commit 1dce9ce4 authored by Reynold Xin's avatar Reynold Xin
Browse files

Moved PartitionStrategy's into an object.

parent ae06d2c2
No related branches found
No related tags found
No related merge requests found
......@@ -4,96 +4,100 @@ package org.apache.spark.graphx
* Represents the way edges are assigned to edge partitions based on their source and destination
* vertex IDs.
*/
sealed trait PartitionStrategy extends Serializable {
trait PartitionStrategy extends Serializable {
/** Returns the partition number for a given edge. */
def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID
}
/**
* Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
* guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
*
* Suppose we have a graph with 11 vertices that we want to partition
* over 9 machines. We can use the following sparse matrix representation:
*
* <pre>
* __________________________________
* v0 | P0 * | P1 | P2 * |
* v1 | **** | * | |
* v2 | ******* | ** | **** |
* v3 | ***** | * * | * |
* ----------------------------------
* v4 | P3 * | P4 *** | P5 ** * |
* v5 | * * | * | |
* v6 | * | ** | **** |
* v7 | * * * | * * | * |
* ----------------------------------
* v8 | P6 * | P7 * | P8 * *|
* v9 | * | * * | |
* v10 | * | ** | * * |
* v11 | * <-E | *** | ** |
* ----------------------------------
* </pre>
*
* The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
* processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
* that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
* row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
* replicated to at most `2 * sqrt(numParts)` machines.
*
* Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
* balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
* vertex locations.
*
* One of the limitations of this approach is that the number of machines must either be a perfect
* square. We partially address this limitation by computing the machine assignment to the next
* largest perfect square and then mapping back down to the actual number of machines.
* Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
* is used.
* Collection of built-in [[PartitionStrategy]] implementations.
*/
case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: VertexID = 1125899906842597L
val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
object PartitionStrategy {
/**
* Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix,
* guaranteeing a `2 * sqrt(numParts)` bound on vertex replication.
*
* Suppose we have a graph with 11 vertices that we want to partition
* over 9 machines. We can use the following sparse matrix representation:
*
* <pre>
* __________________________________
* v0 | P0 * | P1 | P2 * |
* v1 | **** | * | |
* v2 | ******* | ** | **** |
* v3 | ***** | * * | * |
* ----------------------------------
* v4 | P3 * | P4 *** | P5 ** * |
* v5 | * * | * | |
* v6 | * | ** | **** |
* v7 | * * * | * * | * |
* ----------------------------------
* v8 | P6 * | P7 * | P8 * *|
* v9 | * | * * | |
* v10 | * | ** | * * |
* v11 | * <-E | *** | ** |
* ----------------------------------
* </pre>
*
* The edge denoted by `E` connects `v11` with `v1` and is assigned to processor `P6`. To get the
* processor number we divide the matrix into `sqrt(numParts)` by `sqrt(numParts)` blocks. Notice
* that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, P6)` or the last
* row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be
* replicated to at most `2 * sqrt(numParts)` machines.
*
* Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work
* balance. To improve balance we first multiply each vertex id by a large prime to shuffle the
* vertex locations.
*
* One of the limitations of this approach is that the number of machines must either be a perfect
* square. We partially address this limitation by computing the machine assignment to the next
* largest perfect square and then mapping back down to the actual number of machines.
* Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect square
* is used.
*/
case object EdgePartition2D extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
val mixingPrime: VertexID = 1125899906842597L
val col: PartitionID = ((math.abs(src) * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = ((math.abs(dst) * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
}
}
}
/**
* Assigns edges to partitions using only the source vertex ID, colocating edges with the same
* source.
*/
case object EdgePartition1D extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val mixingPrime: VertexID = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
/**
* Assigns edges to partitions using only the source vertex ID, colocating edges with the same
* source.
*/
case object EdgePartition1D extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val mixingPrime: VertexID = 1125899906842597L
(math.abs(src) * mixingPrime).toInt % numParts
}
}
}
/**
* Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a
* random vertex cut that colocates all same-direction edges between two vertices.
*/
case object RandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
math.abs((src, dst).hashCode()) % numParts
/**
* Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a
* random vertex cut that colocates all same-direction edges between two vertices.
*/
case object RandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
math.abs((src, dst).hashCode()) % numParts
}
}
}
/**
* Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical
* direction, resulting in a random vertex cut that colocates all edges between two vertices,
* regardless of direction.
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts
/**
* Assigns edges to partitions by hashing the source and destination vertex IDs in a canonical
* direction, resulting in a random vertex cut that colocates all edges between two vertices,
* regardless of direction.
*/
case object CanonicalRandomVertexCut extends PartitionStrategy {
override def getPartition(src: VertexID, dst: VertexID, numParts: PartitionID): PartitionID = {
val lower = math.min(src, dst)
val higher = math.max(src, dst)
math.abs((lower, higher).hashCode()) % numParts
}
}
}
......@@ -2,6 +2,7 @@ package org.apache.spark.graphx.lib
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.graphx.PartitionStrategy._
/**
* Driver program for running graph algorithms.
......@@ -20,6 +21,7 @@ object Analytics extends Logging {
}
def pickPartitioner(v: String): PartitionStrategy = {
// TODO: Use reflection rather than listing all the partitioning strategies here.
v match {
case "RandomVertexCut" => RandomVertexCut
case "EdgePartition1D" => EdgePartition1D
......
......@@ -4,6 +4,7 @@ import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
import org.apache.spark.rdd._
class GraphSuite extends FunSuite with LocalSparkContext {
......
......@@ -2,11 +2,8 @@ package org.apache.spark.graphx.lib
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.rdd._
import org.apache.spark.graphx.PartitionStrategy.RandomVertexCut
class TriangleCountSuite extends FunSuite with LocalSparkContext {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment