From 6f6f8c928ce493357d4d32e46971c5e401682ea8 Mon Sep 17 00:00:00 2001 From: Ankur Dave <ankurdave@gmail.com> Date: Mon, 13 Jan 2014 21:55:35 -0800 Subject: [PATCH] Wrap methods in the appropriate class/object declaration --- docs/graphx-programming-guide.md | 149 ++++++++++++++++++------------- 1 file changed, 85 insertions(+), 64 deletions(-) diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index aadeb38960..29d397c371 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -256,7 +256,7 @@ compute the in-degree of each vertex (defined in `GraphOps`) by the following: {% highlight scala %} val graph: Graph[(String, String), String] // Use the implicit GraphOps.inDegrees operator -val indDegrees: VertexRDD[Int] = graph.inDegrees +val inDegrees: VertexRDD[Int] = graph.inDegrees {% endhighlight %} The reason for differentiating between core graph operations and [`GraphOps`][GraphOps] is to be @@ -270,9 +270,11 @@ In direct analogy to the RDD `map` operator, the property graph contains the following: {% highlight scala %} -def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] -def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] -def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] +class Graph[VD, ED] { + def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] + def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] + def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] +} {% endhighlight %} Each of these operators yields a new graph with the vertex or edge properties modified by the user @@ -314,11 +316,13 @@ Currently GraphX supports only a simple set of commonly used structural operator add more in the future. The following is a list of the basic structural operators. {% highlight scala %} -def reverse: Graph[VD, ED] -def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, - vpred: (VertexID, VD) => Boolean): Graph[VD, ED] -def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] -def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] +class Graph[VD, ED] { + def reverse: Graph[VD, ED] + def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, + vpred: (VertexID, VD) => Boolean): Graph[VD, ED] + def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] + def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] +} {% endhighlight %} The [`reverse`][Graph.reverse] operator returns a new graph with all the edge directions reversed. @@ -400,10 +404,12 @@ might want to pull vertex properties from one graph into another. These tasks c using the *join* operators. Below we list the key join operators: {% highlight scala %} -def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD) - : Graph[VD, ED] -def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2) - : Graph[VD2, ED] +class Graph[VD, ED] { + def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD) + : Graph[VD, ED] + def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2) + : Graph[VD2, ED] +} {% endhighlight %} The [`joinVertices`][GraphOps.joinVertices] operator joins the vertices with the input RDD and @@ -470,10 +476,12 @@ The core (heavily optimized) aggregation primitive in GraphX is the [`mapReduceTriplets`][Graph.mapReduceTriplets] operator: {% highlight scala %} -def mapReduceTriplets[A]( - map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], - reduce: (A, A) => A) - : VertexRDD[A] +class Graph[VD, ED] { + def mapReduceTriplets[A]( + map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + reduce: (A, A) => A) + : VertexRDD[A] +} {% endhighlight %} The [`mapReduceTriplets`][Graph.mapReduceTriplets] operator takes a user defined map function which @@ -564,12 +572,19 @@ val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max) ### Collecting Neighbors In some cases it may be easier to express computation by collecting neighboring vertices and their -attributes at each vertex. This can be easily accomplished using the `collectNeighborIds` and the -`collectNeighbors` operators. +attributes at each vertex. This can be easily accomplished using the +[`collectNeighborIds`][GraphOps.collectNeighborIds] and the +[`collectNeighbors`][GraphOps.collectNeighbors] operators. + +[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]] +[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]] + {% highlight scala %} -def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] = -def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ] +class GraphOps[VD, ED] { + def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] + def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ] +} {% endhighlight %} > Note that these operators can be quite costly as they duplicate information and require @@ -600,40 +615,44 @@ messages remaining. > neighboring vertices and the message construction is done in parallel using a user defined > messaging function. These constraints allow additional optimization within GraphX. -The following is type signature of the Pregel operator as well as a *sketch* of its implementation -(note calls to graph.cache have been removed): +The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch* +of its implementation (note calls to graph.cache have been removed): + +[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)⇒VD,(EdgeTriplet[VD,ED])⇒Iterator[(VertexID,A)],(A,A)⇒A)(ClassTag[A]):Graph[VD,ED] {% highlight scala %} -def pregel[A] - (initialMsg: A, - maxIter: Int = Int.MaxValue, - activeDir: EdgeDirection = EdgeDirection.Out) - (vprog: (VertexID, VD, A) => VD, - sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], - mergeMsg: (A, A) => A) - : Graph[VD, ED] = { - // Receive the initial message at each vertex - var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() - // compute the messages - var messages = g.mapReduceTriplets(sendMsg, mergeMsg) - var activeMessages = messages.count() - // Loop until no messages remain or maxIterations is achieved - var i = 0 - while (activeMessages > 0 && i < maxIterations) { - // Receive the messages: ----------------------------------------------------------------------- - // Run the vertex program on all vertices that receive messages - val newVerts = g.vertices.innerJoin(messages)(vprog).cache() - // Merge the new vertex values back into the graph - g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache() - // Send Messages: ------------------------------------------------------------------------------ - // Vertices that didn't receive a message above don't appear in newVerts and therefore don't - // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked - // on edges in the activeDir of vertices in newVerts - messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() - activeMessages = messages.count() - i += 1 +class GraphOps[VD, ED] { + def pregel[A] + (initialMsg: A, + maxIter: Int = Int.MaxValue, + activeDir: EdgeDirection = EdgeDirection.Out) + (vprog: (VertexID, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] = { + // Receive the initial message at each vertex + var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache() + // compute the messages + var messages = g.mapReduceTriplets(sendMsg, mergeMsg) + var activeMessages = messages.count() + // Loop until no messages remain or maxIterations is achieved + var i = 0 + while (activeMessages > 0 && i < maxIterations) { + // Receive the messages: ----------------------------------------------------------------------- + // Run the vertex program on all vertices that receive messages + val newVerts = g.vertices.innerJoin(messages)(vprog).cache() + // Merge the new vertex values back into the graph + g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache() + // Send Messages: ------------------------------------------------------------------------------ + // Vertices that didn't receive a message above don't appear in newVerts and therefore don't + // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked + // on edges in the activeDir of vertices in newVerts + messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() + activeMessages = messages.count() + i += 1 + } + g } - g } {% endhighlight %} @@ -749,18 +768,20 @@ time without hash evaluations. To leverage this indexed data-structure, the `Ver following additional functionality: {% highlight scala %} -// Filter the vertex set but preserves the internal index -def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] -// Transform the values without changing the ids (preserves the internal index) -def mapValues[VD2](map: VD => VD2): VertexRDD[VD2] -def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2] -// Remove vertices from this set that appear in the other set -def diff(other: VertexRDD[VD]): VertexRDD[VD] -// Join operators that take advantage of the internal indexing to accelerate joins (substantially) -def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] -def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] -// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD. -def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] +class VertexRDD[VD] { + // Filter the vertex set but preserves the internal index + def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] + // Transform the values without changing the ids (preserves the internal index) + def mapValues[VD2](map: VD => VD2): VertexRDD[VD2] + def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2] + // Remove vertices from this set that appear in the other set + def diff(other: VertexRDD[VD]): VertexRDD[VD] + // Join operators that take advantage of the internal indexing to accelerate joins (substantially) + def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] + def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] + // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD. + def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] +} {% endhighlight %} Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually -- GitLab