diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index e6afd092be5a8ffbd27902b9124f643d26930d3a..c82c3d73587591e9dab1da35324c1e1f73141c6b 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -478,24 +478,26 @@ def mapReduceTriplets[A]( The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which is applied to each triplet and can yield *messages* destined to either (none or both) vertices in -the triplet. We currently only support messages destined to the source or destination vertex of the -triplet to enable optimized preaggregation. The user defined `reduce` function combines the +the triplet. To facilitate optimized pre-aggregation, we currently only support messages destined +to the source or destination vertex of the triplet. The user defined `reduce` function combines the messages destined to each vertex. The `mapReduceTriplets` operator returns a `VertexRDD[A]` -containing the aggregate message to each vertex. Vertices that do not receive a message are not -included in the returned `VertexRDD`. +containing the aggregate message (of type `A`) destined to each vertex. Vertices that do not +receive a message are not included in the returned `VertexRDD`. -> Note that `mapReduceTriplets takes an additional optional `activeSet` (see API docs) which +> Note that `mapReduceTriplets` takes an additional optional `activeSet` (see API docs) which > restricts the map phase to edges adjacent to the vertices in the provided `VertexRDD`. Restricting > computation to triplets adjacent to a subset of the vertices is often necessary in incremental > iterative computation and is a key part of the GraphX implementation of Pregel. -We can use the `mapReduceTriplets` operator to collect information about adjacent vertices. For -example if we wanted to compute the average age of followers who are older that each user we could -do the following. +In the following example we use the `mapReduceTriplets` operator to compute the average age of the +more senior followers of each user. {% highlight scala %} -// Graph with age as the vertex property -val graph: Graph[Double, String] = getFromSomewhereElse() +// Import Random graph generation library +import org.apache.spark.graphx.util.GraphGenerators +// Create a graph with "age" as the vertex property. Here we use a random graph for simplicity. +val graph: Graph[Double, Int] = + GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble ) // Compute the number of older followers and their total age val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)]( triplet => { // Map Function @@ -511,13 +513,16 @@ val olderFollowers: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Dou (a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function ) // Divide total age by number of older followers to get average age of older followers -val avgAgeOlderFollowers: VertexRDD[Double] = - olderFollowers.mapValues { case (count, totalAge) => totalAge / count } +val avgAgeOfOlderFollowers: VertexRDD[Double] = + olderFollowers.mapValues( (id, value) => value match { case (count, totalAge) => totalAge / count } ) +// Display the results +avgAgeOfOlderFollowers.collect.foreach(println(_)) {% endhighlight %} > Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums) > are constant sized (e.g., floats and addition instead of lists and concatenation). More -> precisely, the result of `mapReduceTriplets` should be sub-linear in the degree of each vertex. +> precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each +> vertex. ### Computing Degree Information @@ -529,13 +534,13 @@ compute the max in, out, and total degrees: {% highlight scala %} // Define a reduce operation to compute the highest degree vertex -def maxReduce(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { +def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = { if (a._2 > b._2) a else b } // Compute the max degrees -val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(maxReduce) -val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(maxReduce) -val maxDegrees: (VertexId, Int) = graph.degrees.reduce(maxReduce) +val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max) +val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max) +val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max) {% endhighlight %} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index c21f8935d9c5f23674beacc86550492fb96e415f..916eb9763c70f0be6fd488163b52766b69ee714d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -32,6 +32,9 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( @transient val replicatedVertexView: ReplicatedVertexView[VD]) extends Graph[VD, ED] with Serializable { + /** Default construct is provided to support serialization */ + protected def this() = this(null, null, null, null) + /** Return a RDD that brings edges together with their source and destination vertices. */ @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { val vdTag = classTag[VD]