Skip to content
Snippets Groups Projects
Commit cfe4a29d authored by Joseph E. Gonzalez's avatar Joseph E. Gonzalez
Browse files

Improvements in example code for the programming guide as well as adding...

Improvements in example code for the programming guide as well as adding serialization support for GraphImpl to address issues with failed closure capture.
parent ae4b75d9
No related branches found
No related tags found
No related merge requests found
...@@ -478,24 +478,26 @@ def mapReduceTriplets[A]( ...@@ -478,24 +478,26 @@ def mapReduceTriplets[A](
The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which
is applied to each triplet and can yield *messages* destined to either (none or both) vertices in is applied to each triplet and can yield *messages* destined to either (none or both) vertices in
the triplet. We currently only support messages destined to the source or destination vertex of the the triplet. To facilitate optimized pre-aggregation, we currently only support messages destined
triplet to enable optimized preaggregation. The user defined `reduce` function combines the to the source or destination vertex of the triplet. The user defined `reduce` function combines the
messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]` messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]`
containing the aggregate message to each vertex. Vertices that do not receive a message are not containing the aggregate message (of type `A`) destined to each vertex. Vertices that do not
included in the returned `VertexRDD`. receive a message are not included in the returned `VertexRDD`.
> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs) which > Note that `mapReduceTriplets` takes an additional optional `activeSet` (see API docs) which
> restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting > restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting
> computation to triplets adjacent to a subset of the vertices is often necessary in incremental > computation to triplets adjacent to a subset of the vertices is often necessary in incremental
> iterative computation and is a key part of the GraphX implementation of Pregel. > iterative computation and is a key part of the GraphX implementation of Pregel.
We can use the `mapReduceTriplets` operator to collect information about adjacent vertices. For In the following example we use the `mapReduceTriplets` operator to compute the average age of the
example if we wanted to compute the average age of followers who are older that each user we could more senior followers of each user.
do the following.
{% highlight scala %} {% highlight scala %}
// Graph with age as the vertex property // Import Random graph generation library
val graph: Graph[Double, String] = getFromSomewhereElse() import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age // Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)]( val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)](
triplet => { // Map Function triplet => { // Map Function
...@@ -511,13 +513,16 @@ val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Dou ...@@ -511,13 +513,16 @@ val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Dou
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
) )
// Divide total age by number of older followers to get average age of older followers // Divide total age by number of older followers to get average age of older followers
val avgAgeOlderFollowers: VertexRDD[Double] = val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues { case (count, totalAge) => totalAge / count } olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
{% endhighlight %} {% endhighlight %}
> Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums) > Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums)
> are constant sized (e.g., floats and addition instead of lists and concatenation). More > are constant sized (e.g., floats and addition instead of lists and concatenation). More
> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex. > precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each
> vertex.
### Computing Degree Information ### Computing Degree Information
...@@ -529,13 +534,13 @@ compute the max in, out, and total degrees: ...@@ -529,13 +534,13 @@ compute the max in, out, and total degrees:
{% highlight scala %} {% highlight scala %}
// Define a reduce operation to compute the highest degree vertex // Define a reduce operation to compute the highest degree vertex
def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = {
if (a._2 > b._2) a else b if (a._2 > b._2) a else b
} }
// Compute the max degrees // Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce) val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce) val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce) val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max)
{% endhighlight %} {% endhighlight %}
......
...@@ -32,6 +32,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( ...@@ -32,6 +32,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
@transient val replicatedVertexView: ReplicatedVertexView[VD]) @transient val replicatedVertexView: ReplicatedVertexView[VD])
extends Graph[VD, ED] with Serializable { extends Graph[VD, ED] with Serializable {
/** Default construct is provided to support serialization */
protected def this() = this(null, null, null, null)
/** Return a RDD that brings edges together with their source and destination vertices. */ /** Return a RDD that brings edges together with their source and destination vertices. */
@transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = {
val vdTag = classTag[VD] val vdTag = classTag[VD]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment