Skip to content
Snippets Groups Projects
Commit b715933f authored by Alexander Ulanov's avatar Alexander Ulanov Committed by Ankur Dave
Browse files

[SPARK-9436] [GRAPHX] Pregel simplification patch

Pregel code contains two consecutive joins:
```
g.vertices.innerJoin(messages)(vprog)
...
g = g.outerJoinVertices(newVerts)
{ (vid, old, newOpt) => newOpt.getOrElse(old) }
```
This can be simplified with one join. ankurdave proposed a patch based on our discussion in the mailing list: https://www.mail-archive.com/devspark.apache.org/msg10316.html

Author: Alexander Ulanov <nashb@yandex.ru>

Closes #7749 from avulanov/SPARK-9436-pregel and squashes the following commits:

8568e06 [Alexander Ulanov] Pregel simplification patch
parent 5340dfaf
No related branches found
No related tags found
No related merge requests found
......@@ -127,28 +127,25 @@ object Pregel extends Logging {
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages. Vertices that didn't get any messages do not appear in newVerts.
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Update the graph with the new vertices.
// Receive the messages and update the vertices.
prevG = g
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
g.cache()
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't
// get to send messages. We must cache messages so it can be materialized on the next line,
// allowing us to uncache the previous iteration.
messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache()
// The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = g.mapReduceTriplets(
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
// and the vertices of g).
activeMessages = messages.count()
logInfo("Pregel finished iteration " + i)
// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
newVerts.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
// count the iteration
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment