Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
S
spark
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cs525-sp18-g07
spark
Commits
a6de8021
Commit
a6de8021
authored
12 years ago
by
Mark Hamstra
Browse files
Options
Downloads
Plain Diff
Merge branch 'master' of
https://github.com/mesos/spark
into WithThing
parents
562893be
cbf8f0d4
No related branches found
Branches containing commit
No related tags found
Tags containing commit
No related merge requests found
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
bagel/src/main/scala/spark/bagel/Bagel.scala
+89
-8
89 additions, 8 deletions
bagel/src/main/scala/spark/bagel/Bagel.scala
bagel/src/test/scala/bagel/BagelSuite.scala
+18
-0
18 additions, 0 deletions
bagel/src/test/scala/bagel/BagelSuite.scala
with
107 additions
and
8 deletions
bagel/src/main/scala/spark/bagel/Bagel.scala
+
89
−
8
View file @
a6de8021
...
...
@@ -4,8 +4,38 @@ import spark._
import
spark.SparkContext._
import
scala.collection.mutable.ArrayBuffer
import
storage.StorageLevel
object
Bagel
extends
Logging
{
val
DEFAULT_STORAGE_LEVEL
=
StorageLevel
.
MEMORY_ONLY
/**
* Runs a Bagel program.
* @param sc [[spark.SparkContext]] to use for the program.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the Key will be
* the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often this will be an
* empty array, i.e. sc.parallelize(Array[K, Message]()).
* @param combiner [[spark.bagel.Combiner]] combines multiple individual messages to a given vertex into one
* message before sending (which often involves network I/O).
* @param aggregator [[spark.bagel.Aggregator]] performs a reduce across all vertices after each superstep,
* and provides the result to each vertex in the next superstep.
* @param partitioner [[spark.Partitioner]] partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
* @param storageLevel [[spark.storage.StorageLevel]] to use for caching of intermediate RDDs in each superstep.
* Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to the Vertex,
* optional Aggregator and the current superstep,
* and returns a set of (Vertex, outgoing Messages) pairs
* @tparam K key
* @tparam V vertex type
* @tparam M message type
* @tparam C combiner
* @tparam A aggregator
* @return an RDD of (K, V) pairs representing the graph after completion of the program
*/
def
run
[
K:
Manifest
,
V
<:
Vertex
:
Manifest
,
M
<:
Message
[
K
]
:
Manifest
,
C:
Manifest
,
A:
Manifest
](
sc
:
SparkContext
,
...
...
@@ -14,7 +44,8 @@ object Bagel extends Logging {
combiner
:
Combiner
[
M
,
C
],
aggregator
:
Option
[
Aggregator
[
V
,
A
]],
partitioner
:
Partitioner
,
numPartitions
:
Int
numPartitions
:
Int
,
storageLevel
:
StorageLevel
=
DEFAULT_STORAGE_LEVEL
)(
compute
:
(
V
,
Option
[
C
],
Option
[
A
],
Int
)
=>
(
V
,
Array
[
M
])
)
:
RDD
[(
K
,
V
)]
=
{
...
...
@@ -33,7 +64,7 @@ object Bagel extends Logging {
combiner
.
createCombiner
_
,
combiner
.
mergeMsg
_
,
combiner
.
mergeCombiners
_
,
partitioner
)
val
grouped
=
combinedMsgs
.
groupWith
(
verts
)
val
(
processed
,
numMsgs
,
numActiveVerts
)
=
comp
[
K
,
V
,
M
,
C
](
sc
,
grouped
,
compute
(
_
,
_
,
aggregated
,
superstep
))
comp
[
K
,
V
,
M
,
C
](
sc
,
grouped
,
compute
(
_
,
_
,
aggregated
,
superstep
)
,
storageLevel
)
val
timeTaken
=
System
.
currentTimeMillis
-
startTime
logInfo
(
"Superstep %d took %d s"
.
format
(
superstep
,
timeTaken
/
1000
))
...
...
@@ -50,6 +81,7 @@ object Bagel extends Logging {
verts
}
/** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default storage level */
def
run
[
K:
Manifest
,
V
<:
Vertex
:
Manifest
,
M
<:
Message
[
K
]
:
Manifest
,
C:
Manifest
](
sc
:
SparkContext
,
vertices
:
RDD
[(
K
,
V
)],
...
...
@@ -59,12 +91,29 @@ object Bagel extends Logging {
numPartitions
:
Int
)(
compute
:
(
V
,
Option
[
C
],
Int
)
=>
(
V
,
Array
[
M
])
)
:
RDD
[(
K
,
V
)]
=
run
(
sc
,
vertices
,
messages
,
combiner
,
numPartitions
,
DEFAULT_STORAGE_LEVEL
)(
compute
)
/** Runs a Bagel program with no [[spark.bagel.Aggregator]] */
def
run
[
K:
Manifest
,
V
<:
Vertex
:
Manifest
,
M
<:
Message
[
K
]
:
Manifest
,
C:
Manifest
](
sc
:
SparkContext
,
vertices
:
RDD
[(
K
,
V
)],
messages
:
RDD
[(
K
,
M
)],
combiner
:
Combiner
[
M
,
C
],
partitioner
:
Partitioner
,
numPartitions
:
Int
,
storageLevel
:
StorageLevel
)(
compute
:
(
V
,
Option
[
C
],
Int
)
=>
(
V
,
Array
[
M
])
)
:
RDD
[(
K
,
V
)]
=
{
run
[
K
,
V
,
M
,
C
,
Nothing
](
sc
,
vertices
,
messages
,
combiner
,
None
,
partitioner
,
numPartitions
)(
sc
,
vertices
,
messages
,
combiner
,
None
,
partitioner
,
numPartitions
,
storageLevel
)(
addAggregatorArg
[
K
,
V
,
M
,
C
](
compute
))
}
/**
* Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]]
* and default storage level
*/
def
run
[
K:
Manifest
,
V
<:
Vertex
:
Manifest
,
M
<:
Message
[
K
]
:
Manifest
,
C:
Manifest
](
sc
:
SparkContext
,
vertices
:
RDD
[(
K
,
V
)],
...
...
@@ -73,13 +122,29 @@ object Bagel extends Logging {
numPartitions
:
Int
)(
compute
:
(
V
,
Option
[
C
],
Int
)
=>
(
V
,
Array
[
M
])
)
:
RDD
[(
K
,
V
)]
=
run
(
sc
,
vertices
,
messages
,
combiner
,
numPartitions
,
DEFAULT_STORAGE_LEVEL
)(
compute
)
/** Runs a Bagel program with no [[spark.bagel.Aggregator]] and the default [[spark.HashPartitioner]]*/
def
run
[
K:
Manifest
,
V
<:
Vertex
:
Manifest
,
M
<:
Message
[
K
]
:
Manifest
,
C:
Manifest
](
sc
:
SparkContext
,
vertices
:
RDD
[(
K
,
V
)],
messages
:
RDD
[(
K
,
M
)],
combiner
:
Combiner
[
M
,
C
],
numPartitions
:
Int
,
storageLevel
:
StorageLevel
)(
compute
:
(
V
,
Option
[
C
],
Int
)
=>
(
V
,
Array
[
M
])
)
:
RDD
[(
K
,
V
)]
=
{
val
part
=
new
HashPartitioner
(
numPartitions
)
run
[
K
,
V
,
M
,
C
,
Nothing
](
sc
,
vertices
,
messages
,
combiner
,
None
,
part
,
numPartitions
)(
sc
,
vertices
,
messages
,
combiner
,
None
,
part
,
numPartitions
,
storageLevel
)(
addAggregatorArg
[
K
,
V
,
M
,
C
](
compute
))
}
/**
* Runs a Bagel program with no [[spark.bagel.Aggregator]], default [[spark.HashPartitioner]],
* [[spark.bagel.DefaultCombiner]] and the default storage level
*/
def
run
[
K:
Manifest
,
V
<:
Vertex
:
Manifest
,
M
<:
Message
[
K
]
:
Manifest
](
sc
:
SparkContext
,
vertices
:
RDD
[(
K
,
V
)],
...
...
@@ -87,10 +152,24 @@ object Bagel extends Logging {
numPartitions
:
Int
)(
compute
:
(
V
,
Option
[
Array
[
M
]],
Int
)
=>
(
V
,
Array
[
M
])
)
:
RDD
[(
K
,
V
)]
=
{
)
:
RDD
[(
K
,
V
)]
=
run
(
sc
,
vertices
,
messages
,
numPartitions
,
DEFAULT_STORAGE_LEVEL
)(
compute
)
/**
* Runs a Bagel program with no [[spark.bagel.Aggregator]], the default [[spark.HashPartitioner]]
* and [[spark.bagel.DefaultCombiner]]
*/
def
run
[
K:
Manifest
,
V
<:
Vertex
:
Manifest
,
M
<:
Message
[
K
]
:
Manifest
](
sc
:
SparkContext
,
vertices
:
RDD
[(
K
,
V
)],
messages
:
RDD
[(
K
,
M
)],
numPartitions
:
Int
,
storageLevel
:
StorageLevel
)(
compute
:
(
V
,
Option
[
Array
[
M
]],
Int
)
=>
(
V
,
Array
[
M
])
)
:
RDD
[(
K
,
V
)]
=
{
val
part
=
new
HashPartitioner
(
numPartitions
)
run
[
K
,
V
,
M
,
Array
[
M
]
,
Nothing
](
sc
,
vertices
,
messages
,
new
DefaultCombiner
(),
None
,
part
,
numPartitions
)(
sc
,
vertices
,
messages
,
new
DefaultCombiner
(),
None
,
part
,
numPartitions
,
storageLevel
)(
addAggregatorArg
[
K
,
V
,
M
,
Array
[
M
]](
compute
))
}
...
...
@@ -117,7 +196,8 @@ object Bagel extends Logging {
private
def
comp
[
K:
Manifest
,
V
<:
Vertex
,
M
<:
Message
[
K
]
,
C
](
sc
:
SparkContext
,
grouped
:
RDD
[(
K
,
(
Seq
[
C
]
,
Seq
[
V
]))],
compute
:
(
V
,
Option
[
C
])
=>
(
V
,
Array
[
M
])
compute
:
(
V
,
Option
[
C
])
=>
(
V
,
Array
[
M
]),
storageLevel
:
StorageLevel
)
:
(
RDD
[(
K
,
(
V
,
Array
[
M
]))],
Int
,
Int
)
=
{
var
numMsgs
=
sc
.
accumulator
(
0
)
var
numActiveVerts
=
sc
.
accumulator
(
0
)
...
...
@@ -135,7 +215,7 @@ object Bagel extends Logging {
numActiveVerts
+=
1
Some
((
newVert
,
newMsgs
))
}.
cache
}.
persist
(
storageLevel
)
// Force evaluation of processed RDD for accurate performance measurements
processed
.
foreach
(
x
=>
{})
...
...
@@ -166,6 +246,7 @@ trait Aggregator[V, A] {
def
mergeAggregators
(
a
:
A
,
b
:
A
)
:
A
}
/** Default combiner that simply appends messages together (i.e. performs no aggregation) */
class
DefaultCombiner
[
M:
Manifest
]
extends
Combiner
[
M
,
Array
[
M
]]
with
Serializable
{
def
createCombiner
(
msg
:
M
)
:
Array
[
M
]
=
Array
(
msg
)
...
...
This diff is collapsed.
Click to expand it.
bagel/src/test/scala/bagel/BagelSuite.scala
+
18
−
0
View file @
a6de8021
...
...
@@ -7,6 +7,7 @@ import org.scalatest.time.SpanSugar._
import
scala.collection.mutable.ArrayBuffer
import
spark._
import
storage.StorageLevel
class
TestVertex
(
val
active
:
Boolean
,
val
age
:
Int
)
extends
Vertex
with
Serializable
class
TestMessage
(
val
targetId
:
String
)
extends
Message
[
String
]
with
Serializable
...
...
@@ -79,4 +80,21 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
}
}
}
test
(
"using non-default persistence level"
)
{
failAfter
(
10
seconds
)
{
sc
=
new
SparkContext
(
"local"
,
"test"
)
val
verts
=
sc
.
parallelize
((
1
to
4
).
map
(
id
=>
(
id
.
toString
,
new
TestVertex
(
true
,
0
))))
val
msgs
=
sc
.
parallelize
(
Array
[(
String
,
TestMessage
)]())
val
numSupersteps
=
50
val
result
=
Bagel
.
run
(
sc
,
verts
,
msgs
,
sc
.
defaultParallelism
,
StorageLevel
.
DISK_ONLY
)
{
(
self
:
TestVertex
,
msgs
:
Option
[
Array
[
TestMessage
]],
superstep
:
Int
)
=>
(
new
TestVertex
(
superstep
<
numSupersteps
-
1
,
self
.
age
+
1
),
Array
[
TestMessage
]())
}
for
((
id
,
vert
)
<-
result
.
collect
)
{
assert
(
vert
.
age
===
numSupersteps
)
}
}
}
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment