Skip to content
Snippets Groups Projects
Commit 0bba7738 authored by Joseph E. Gonzalez's avatar Joseph E. Gonzalez
Browse files

Additional edits for clarity in the graphx programming guide.

parent 3fcc68bf
No related branches found
No related tags found
No related merge requests found
...@@ -108,7 +108,7 @@ with user defined objects attached to each vertex and edge. A directed multigra ...@@ -108,7 +108,7 @@ with user defined objects attached to each vertex and edge. A directed multigra
graph with potentially multiple parallel edges sharing the same source and destination vertex. The graph with potentially multiple parallel edges sharing the same source and destination vertex. The
ability to support parallel edges simplifies modeling scenarios where there can be multiple ability to support parallel edges simplifies modeling scenarios where there can be multiple
relationships (e.g., co-worker and friend) between the same vertices. Each vertex is keyed by a relationships (e.g., co-worker and friend) between the same vertices. Each vertex is keyed by a
*unique* 64-bit long identifier (`VertexId`). GraphX does not impose any ordering constraints on *unique* 64-bit long identifier (`VertexID`). GraphX does not impose any ordering constraints on
the vertex identifiers. Similarly, edges have corresponding source and destination vertex the vertex identifiers. Similarly, edges have corresponding source and destination vertex
identifiers. identifiers.
...@@ -149,12 +149,12 @@ class Graph[VD, ED] { ...@@ -149,12 +149,12 @@ class Graph[VD, ED] {
} }
{% endhighlight %} {% endhighlight %}
The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexId, The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID,
VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide additional VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide additional
functionality built around graph computation and leverage internal optimizations. We discuss the functionality built around graph computation and leverage internal optimizations. We discuss the
`VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge `VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge
RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form: RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form:
`RDD[(VertexId, VD)]` and `RDD[Edge[ED]]`. `RDD[(VertexID, VD)]` and `RDD[Edge[ED]]`.
### Example Property Graph ### Example Property Graph
...@@ -201,7 +201,7 @@ val graph = Graph(users, relationships, defaultUser) ...@@ -201,7 +201,7 @@ val graph = Graph(users, relationships, defaultUser)
In the above example we make use of the [`Edge`][Edge] case class. Edges have a `srcId` and a In the above example we make use of the [`Edge`][Edge] case class. Edges have a `srcId` and a
`dstId` corresponding to the source and destination vertex identifiers. In addition, the `Edge` `dstId` corresponding to the source and destination vertex identifiers. In addition, the `Edge`
class contains the `attr` member which contains the edge property. class has an `attr` member which stores the edge property.
[Edge]: api/graphx/index.html#org.apache.spark.graphx.Edge [Edge]: api/graphx/index.html#org.apache.spark.graphx.Edge
...@@ -217,7 +217,7 @@ graph.edges.filter(e => e.srcId > e.dstId).count ...@@ -217,7 +217,7 @@ graph.edges.filter(e => e.srcId > e.dstId).count
{% endhighlight %} {% endhighlight %}
> Note that `graph.vertices` returns an `VertexRDD[(String, String)]` which extends > Note that `graph.vertices` returns an `VertexRDD[(String, String)]` which extends
> `RDD[(VertexId, (String, String))]` and so we use the scala `case` expression to deconstruct the > `RDD[(VertexID, (String, String))]` and so we use the scala `case` expression to deconstruct the
> tuple. On the other hand, `graph.edges` returns an `EdgeRDD` containing `Edge[String]` objects. > tuple. On the other hand, `graph.edges` returns an `EdgeRDD` containing `Edge[String]` objects.
> We could have also used the case class type constructor as in the following: > We could have also used the case class type constructor as in the following:
> {% highlight scala %} > {% highlight scala %}
...@@ -284,6 +284,75 @@ able to support different graph representations in the future. Each graph repre ...@@ -284,6 +284,75 @@ able to support different graph representations in the future. Each graph repre
provide implementations of the core operations and reuse many of the useful operations defined in provide implementations of the core operations and reuse many of the useful operations defined in
[`GraphOps`][GraphOps]. [`GraphOps`][GraphOps].
### Summary List of Operators
The following is a quick summary of the functionality defined in both [`Graph`][Graph] and
[`GraphOps`][GraphOps] but presented as members of Graph for simplicity. Note that some function
signatures have been simplified (e.g., default arguments and type constraints removed) and some more
advanced functionality has been removed so please consult the API docs for the official list of
operations.
{% highlight scala %}
/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexID, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
(mapFunc: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexID, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED]
}
{% endhighlight %}
## Property Operators ## Property Operators
In direct analogy to the RDD `map` operator, the property In direct analogy to the RDD `map` operator, the property
...@@ -443,7 +512,7 @@ original value. ...@@ -443,7 +512,7 @@ original value.
> is therefore recommended that the input RDD be first made unique using the following which will > is therefore recommended that the input RDD be first made unique using the following which will
> also *pre-index* the resulting values to substantially accelerate the subsequent join. > also *pre-index* the resulting values to substantially accelerate the subsequent join.
> {% highlight scala %} > {% highlight scala %}
val nonUniqueCosts: RDD[(VertexId, Double)] val nonUniqueCosts: RDD[(VertexID, Double)]
val uniqueCosts: VertexRDD[Double] = val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b) graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)( val joinedGraph = graph.joinVertices(uniqueCosts)(
...@@ -475,7 +544,7 @@ val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) ...@@ -475,7 +544,7 @@ val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt)
> provide type annotation for the user defined function: > provide type annotation for the user defined function:
> {% highlight scala %} > {% highlight scala %}
val joinedGraph = graph.joinVertices(uniqueCosts, val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost) (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost)
{% endhighlight %} {% endhighlight %}
...@@ -513,26 +582,26 @@ containing the aggregate message (of type `A`) destined to each vertex. Vertice ...@@ -513,26 +582,26 @@ containing the aggregate message (of type `A`) destined to each vertex. Vertice
receive a message are not included in the returned `VertexRDD`. receive a message are not included in the returned `VertexRDD`.
<blockquote> <blockquote>
<p>
Note that <code>mapReduceTriplets</code> takes an additional optional <code>activeSet</code> <p>Note that <code>mapReduceTriplets</code> takes an additional optional <code>activeSet</code>
(see API docs) which restricts the map phase to edges adjacent to the vertices in the provided (not shown above see API docs for details) which restricts the map phase to edges adjacent to the
<code>VertexRDD</code>: vertices in the provided <code>VertexRDD</code>: </p>
</p>
{% highlight scala %} {% highlight scala %}
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None
{% endhighlight %} {% endhighlight %}
<p>
The EdgeDirection specifies which edges adjacent to the vertex set are included in the map phase. If <p>The EdgeDirection specifies which edges adjacent to the vertex set are included in the map
the direction is <code>In</code>, <code>mapFunc</code> will only be run only on edges with phase. If the direction is <code>In</code>, then the user defined <code>map</code> function will
destination in the active set. If the direction is <code>Out</code>, <code>mapFunc</code> will only only be run only on edges with the destination vertex in the active set. If the direction is
be run only on edges originating from vertices in the active set. If the direction is <code>Out</code>, then the <code>map</code> function will only be run only on edges originating from
<code>Either</code>, <code>mapFunc</code> will be run only on edges with <i>either</i> vertex in the vertices in the active set. If the direction is <code>Either</code>, then the <code>map</code>
active set. If the direction is <code>Both</code>, <code>mapFunc</code> will be run only on edges function will be run only on edges with <i>either</i> vertex in the active set. If the direction is
with both vertices in the active set. The active set must be derived from the set of vertices in <code>Both</code>, then the <code>map</code> function will be run only on edges with both vertices
the graph. Restricting computation to triplets adjacent to a subset of the vertices is often in the active set. The active set must be derived from the set of vertices in the graph.
necessary in incremental iterative computation and is a key part of the GraphX implementation of Restricting computation to triplets adjacent to a subset of the vertices is often necessary in
Pregel. incremental iterative computation and is a key part of the GraphX implementation of Pregel. </p>
</p>
</blockquote> </blockquote>
In the following example we use the `mapReduceTriplets` operator to compute the average age of the In the following example we use the `mapReduceTriplets` operator to compute the average age of the
...@@ -565,8 +634,8 @@ val avgAgeOfOlderFollowers: VertexRDD[Double] = ...@@ -565,8 +634,8 @@ val avgAgeOfOlderFollowers: VertexRDD[Double] =
avgAgeOfOlderFollowers.collect.foreach(println(_)) avgAgeOfOlderFollowers.collect.foreach(println(_))
{% endhighlight %} {% endhighlight %}
> Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums) > Note that the `mapReduceTriplets` operation performs optimally when the messages (and the sums of
> are constant sized (e.g., floats and addition instead of lists and concatenation). More > messages) are constant sized (e.g., floats and addition instead of lists and concatenation). More
> precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each > precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each
> vertex. > vertex.
...@@ -779,16 +848,16 @@ respectively. In this section we review some of the additional useful functiona ...@@ -779,16 +848,16 @@ respectively. In this section we review some of the additional useful functiona
## VertexRDDs ## VertexRDDs
The `VertexRDD[A]` extends the more traditional `RDD[(VertexId, A)]` but adds the additional The `VertexRDD[A]` extends `RDD[(VertexID, A)]` and adds the additional constraint that each
constraint that each `VertexId` occurs only *once*. Moreover, `VertexRDD[A]` represents a *set* of `VertexID` occurs only *once*. Moreover, `VertexRDD[A]` represents a *set* of vertices each with an
vertices each with an attribute of type `A`. Internally, this is achieved by storing the vertex attribute of type `A`. Internally, this is achieved by storing the vertex attributes in a reusable
attributes in a reusable hash-map data-structure. As a consequence if two `VertexRDD`s are derived hash-map data-structure. As a consequence if two `VertexRDD`s are derived from the same base
from the same base `VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant `VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant time without hash
time without hash evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the following
following additional functionality: additional functionality:
{% highlight scala %} {% highlight scala %}
class VertexRDD[VD] { class VertexRDD[VD] extends RDD[(VertexID, VD)] {
// Filter the vertex set but preserves the internal index // Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index) // Transform the values without changing the ids (preserves the internal index)
...@@ -807,15 +876,14 @@ class VertexRDD[VD] { ...@@ -807,15 +876,14 @@ class VertexRDD[VD] {
Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually
implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins
with other `VertexRDD`s. Likewise, the `mapValues` operators do not allow the `map` function to with other `VertexRDD`s. Likewise, the `mapValues` operators do not allow the `map` function to
change the `VertexId` thereby enabling the same `HashMap` data-structures to be reused. Both the change the `VertexID` thereby enabling the same `HashMap` data-structures to be reused. Both the
`leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same `leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same
`HashMap` and implement the join by linear scan rather than costly point lookups. `HashMap` and implement the join by linear scan rather than costly point lookups.
The `aggregateUsingIndex` operator can be slightly confusing but is also useful for efficient The `aggregateUsingIndex` operator is useful for efficient construction of a new `VertexRDD` from an
construction of a new `VertexRDD` from an `RDD[(VertexId, A)]`. Conceptually, if I have constructed `RDD[(VertexID, A)]`. Conceptually, if I have constructed a `VertexRDD[B]` over a set of vertices,
a `VertexRDD[B]` over a set of vertices, *which is a super-set* of the vertices in some *which is a super-set* of the vertices in some `RDD[(VertexID, A)]` then I can reuse the index to
`RDD[(VertexId, A)]` then I can reuse the index to both aggregate and then subsequently index the both aggregate and then subsequently index the `RDD[(VertexID, A)]`. For example:
RDD. For example:
{% highlight scala %} {% highlight scala %}
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1))) val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
...@@ -831,10 +899,10 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b) ...@@ -831,10 +899,10 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
## EdgeRDDs ## EdgeRDDs
The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` is considerably simpler than the `VertexRDD`. The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one
GraphX organizes the edges in blocks partitioned using one of the various partitioning strategies of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy]. Within
defined in [`PartitionStrategy`][PartitionStrategy]. Within each partition, edge attributes and each partition, edge attributes and adjacency structure, are stored separately enabling maximum
adjacency structure, are stored separately enabling maximum reuse when changing attribute values. reuse when changing attribute values.
[PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy [PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy
...@@ -849,7 +917,7 @@ def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexID, VertexID, ED, ED2) => ...@@ -849,7 +917,7 @@ def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexID, VertexID, ED, ED2) =>
{% endhighlight %} {% endhighlight %}
In most applications we have found that operations on the `EdgeRDD` are accomplished through the In most applications we have found that operations on the `EdgeRDD` are accomplished through the
graph or rely on operations defined in the base `RDD` class. graph operators or rely on operations defined in the base `RDD` class.
# Optimized Representation # Optimized Representation
...@@ -871,7 +939,9 @@ reduce both the communication and storage overhead. Logically, this corresponds ...@@ -871,7 +939,9 @@ reduce both the communication and storage overhead. Logically, this corresponds
to machines and allowing vertices to span multiple machines. The exact method of assigning edges to machines and allowing vertices to span multiple machines. The exact method of assigning edges
depends on the [`PartitionStrategy`][PartitionStrategy] and there are several tradeoffs to the depends on the [`PartitionStrategy`][PartitionStrategy] and there are several tradeoffs to the
various heuristics. Users can choose between different strategies by repartitioning the graph with various heuristics. Users can choose between different strategies by repartitioning the graph with
the [`Graph.partitionBy`][Graph.partitionBy] operator. the [`Graph.partitionBy`][Graph.partitionBy] operator. The default partitioning strategy is to use
the initial partitioning of the edges as provided on graph construction. However, users can easily
switch to 2D-partitioning or other heuristics included in GraphX.
[Graph.partitionBy]: api/graphx/index.html#org.apache.spark.graphx.Graph$@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy):org.apache.spark.graphx.Graph[VD,ED] [Graph.partitionBy]: api/graphx/index.html#org.apache.spark.graphx.Graph$@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy):org.apache.spark.graphx.Graph[VD,ED]
...@@ -885,16 +955,15 @@ the [`Graph.partitionBy`][Graph.partitionBy] operator. ...@@ -885,16 +955,15 @@ the [`Graph.partitionBy`][Graph.partitionBy] operator.
Once the edges have be partitioned the key challenge to efficient graph-parallel computation is Once the edges have be partitioned the key challenge to efficient graph-parallel computation is
efficiently joining vertex attributes with the edges. Because real-world graphs typically have more efficiently joining vertex attributes with the edges. Because real-world graphs typically have more
edges than vertices, we move vertex attributes to the edges. edges than vertices, we move vertex attributes to the edges. Because not all partitions will
contain edges adjacent to all vertices we internally maintain a routing table which identifies where
to broadcast vertices when implementing the join required for operations like `triplets` and
`mapReduceTriplets`.
# Graph Algorithms # Graph Algorithms
<a name="graph_algorithms"></a> <a name="graph_algorithms"></a>
GraphX includes a set of graph algorithms in to simplify analytics. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via [`GraphOps`][GraphOps]. This section describes the algorithms and how they are used. GraphX includes a set of graph algorithms to simplify analytics tasks. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via [`GraphOps`][GraphOps]. This section describes the algorithms and how they are used.
## PageRank ## PageRank
<a name="pagerank"></a> <a name="pagerank"></a>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment