Skip to content
Snippets Groups Projects
Commit 59e4384e authored by Ankur Dave's avatar Ankur Dave
Browse files

Fix Pregel SSSP example in programming guide

parent c6023bee
No related branches found
No related tags found
No related merge requests found
...@@ -511,7 +511,7 @@ In the following example we use the `mapReduceTriplets` operator to compute the ...@@ -511,7 +511,7 @@ In the following example we use the `mapReduceTriplets` operator to compute the
more senior followers of each user. more senior followers of each user.
{% highlight scala %} {% highlight scala %}
// Import Random graph generation library // Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity. // Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] = val graph: Graph[Double, Int] =
...@@ -643,18 +643,23 @@ iterations, and the edge direction in which to send messages (by default along o ...@@ -643,18 +643,23 @@ iterations, and the edge direction in which to send messages (by default along o
second argument list contains the user defined functions for receiving messages (the vertex program second argument list contains the user defined functions for receiving messages (the vertex program
`vprog`), computing messages (`sendMsg`), and combining messages `mergeMsg`. `vprog`), computing messages (`sendMsg`), and combining messages `mergeMsg`.
We can use the Pregel operator to express computation such single source shortest path in the We can use the Pregel operator to express computation such as single source
following example. shortest path in the following example.
{% highlight scala %} {% highlight scala %}
val graph: Graph[String, Double] // A graph with edge attributes containing distances import org.apache.spark.graphx._
val sourceId: VertexId = 42 // The ultimate source // Import random graph generation library
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Int, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexID = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity. // Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) => if (id == shourceId) 0.0 else Double.PositiveInfinity) val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)( val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist) // Vertex Program (id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message triplet => { // Send Message
if(triplet.srcAttr + triplet.attr < triplet.dstAttr) { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else { } else {
Iterator.empty Iterator.empty
...@@ -662,6 +667,7 @@ val sssp = initialGraph.pregel(Double.PositiveInfinity)( ...@@ -662,6 +667,7 @@ val sssp = initialGraph.pregel(Double.PositiveInfinity)(
}, },
(a,b) => math.min(a,b) // Merge Message (a,b) => math.min(a,b) // Merge Message
) )
println(sssp.vertices.collect.mkString("\n"))
{% endhighlight %} {% endhighlight %}
# Graph Builders # Graph Builders
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment