Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
a13460bb
Commit
a13460bb
authored
11 years ago
by
Dan Crankshaw
Committed by
Ankur Dave
11 years ago
Browse files
Options
Downloads
Patches
Plain Diff
Updated documentation
parent
7c573a8b
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
+54
-2
54 additions, 2 deletions
...main/scala/org/apache/spark/graph/PartitionStrategy.scala
with
54 additions
and
2 deletions
graph/src/main/scala/org/apache/spark/graph/PartitionStrategy.scala
+
54
−
2
View file @
a13460bb
...
@@ -5,7 +5,51 @@ package org.apache.spark.graph
...
@@ -5,7 +5,51 @@ package org.apache.spark.graph
sealed
trait
PartitionStrategy
extends
Serializable
{
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
}
sealed
trait
PartitionStrategy
extends
Serializable
{
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
}
//case object EdgePartition2D extends PartitionStrategy {
/**
* This function implements a classic 2D-Partitioning of a sparse matrix.
* Suppose we have a graph with 11 vertices that we want to partition
* over 9 machines. We can use the following sparse matrix representation:
*
* __________________________________
* v0 | P0 * | P1 | P2 * |
* v1 | **** | * | |
* v2 | ******* | ** | **** |
* v3 | ***** | * * | * |
* ----------------------------------
* v4 | P3 * | P4 *** | P5 ** * |
* v5 | * * | * | |
* v6 | * | ** | **** |
* v7 | * * * | * * | * |
* ----------------------------------
* v8 | P6 * | P7 * | P8 * *|
* v9 | * | * * | |
* v10 | * | ** | * * |
* v11 | * <-E | *** | ** |
* ----------------------------------
*
* The edge denoted by E connects v11 with v1 and is assigned to
* processor P6. To get the processor number we divide the matrix
* into sqrt(numProc) by sqrt(numProc) blocks. Notice that edges
* adjacent to v11 can only be in the first colum of
* blocks (P0, P3, P6) or the last row of blocks (P6, P7, P8).
* As a consequence we can guarantee that v11 will need to be
* replicated to at most 2 * sqrt(numProc) machines.
*
* Notice that P0 has many edges and as a consequence this
* partitioning would lead to poor work balance. To improve
* balance we first multiply each vertex id by a large prime
* to effectively shuffle the vertex locations.
*
* One of the limitations of this approach is that the number of
* machines must either be a perfect square. We partially address
* this limitation by computing the machine assignment to the next
* largest perfect square and then mapping back down to the actual
* number of machines. Unfortunately, this can also lead to work
* imbalance and so it is suggested that a perfect square is used.
*
*
*/
object
EdgePartition2D
extends
PartitionStrategy
{
object
EdgePartition2D
extends
PartitionStrategy
{
override
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
=
{
override
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
=
{
val
ceilSqrtNumParts
:
Pid
=
math
.
ceil
(
math
.
sqrt
(
numParts
)).
toInt
val
ceilSqrtNumParts
:
Pid
=
math
.
ceil
(
math
.
sqrt
(
numParts
)).
toInt
...
@@ -17,7 +61,6 @@ object EdgePartition2D extends PartitionStrategy {
...
@@ -17,7 +61,6 @@ object EdgePartition2D extends PartitionStrategy {
}
}
object
EdgePartition1D
extends
PartitionStrategy
{
object
EdgePartition1D
extends
PartitionStrategy
{
override
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
=
{
override
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
=
{
val
mixingPrime
:
Vid
=
1125899906842597L
val
mixingPrime
:
Vid
=
1125899906842597L
...
@@ -26,6 +69,10 @@ object EdgePartition1D extends PartitionStrategy {
...
@@ -26,6 +69,10 @@ object EdgePartition1D extends PartitionStrategy {
}
}
/**
* Assign edges to an aribtrary machine corresponding to a
* random vertex cut.
*/
object
RandomVertexCut
extends
PartitionStrategy
{
object
RandomVertexCut
extends
PartitionStrategy
{
override
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
=
{
override
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
=
{
math
.
abs
((
src
,
dst
).
hashCode
())
%
numParts
math
.
abs
((
src
,
dst
).
hashCode
())
%
numParts
...
@@ -33,6 +80,11 @@ object RandomVertexCut extends PartitionStrategy {
...
@@ -33,6 +80,11 @@ object RandomVertexCut extends PartitionStrategy {
}
}
/**
* Assign edges to an arbitrary machine corresponding to a random vertex cut. This
* function ensures that edges of opposite direction between the same two vertices
* will end up on the same partition.
*/
object
CanonicalRandomVertexCut
extends
PartitionStrategy
{
object
CanonicalRandomVertexCut
extends
PartitionStrategy
{
override
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
=
{
override
def
getPartition
(
src
:
Vid
,
dst
:
Vid
,
numParts
:
Pid
)
:
Pid
=
{
val
lower
=
math
.
min
(
src
,
dst
)
val
lower
=
math
.
min
(
src
,
dst
)
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment