Skip to content
Snippets Groups Projects
  • WeichenXu's avatar
    0bd7cd18
    [SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming... · 0bd7cd18
    WeichenXu authored
    [SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming guide example snippets from source files instead of hard code them
    
    ## What changes were proposed in this pull request?
    
    I extract 6 example programs from GraphX programming guide and replace them with
    `include_example` label.
    
    The 6 example programs are:
    - AggregateMessagesExample.scala
    - SSSPExample.scala
    - TriangleCountingExample.scala
    - ConnectedComponentsExample.scala
    - ComprehensiveExample.scala
    - PageRankExample.scala
    
    All the example code can run using
    `bin/run-example graphx.EXAMPLE_NAME`
    
    ## How was this patch tested?
    
    Manual.
    
    Author: WeichenXu <WeichenXu123@outlook.com>
    
    Closes #14015 from WeichenXu123/graphx_example_plugin.
    0bd7cd18
    History
    [SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming...
    WeichenXu authored
    [SPARK-16345][DOCUMENTATION][EXAMPLES][GRAPHX] Extract graphx programming guide example snippets from source files instead of hard code them
    
    ## What changes were proposed in this pull request?
    
    I extract 6 example programs from GraphX programming guide and replace them with
    `include_example` label.
    
    The 6 example programs are:
    - AggregateMessagesExample.scala
    - SSSPExample.scala
    - TriangleCountingExample.scala
    - ConnectedComponentsExample.scala
    - ComprehensiveExample.scala
    - PageRankExample.scala
    
    All the example code can run using
    `bin/run-example graphx.EXAMPLE_NAME`
    
    ## How was this patch tested?
    
    Manual.
    
    Author: WeichenXu <WeichenXu123@outlook.com>
    
    Closes #14015 from WeichenXu123/graphx_example_plugin.
graphx-programming-guide.md 52.61 KiB
layout: global
displayTitle: GraphX Programming Guide
title: GraphX
description: GraphX graph processing library guide for Spark SPARK_VERSION_SHORT
  • This will become a table of contents (this text will be scraped). {:toc}

GraphX

Overview

GraphX is a new component in Spark for graphs and graph-parallel computation. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. To support graph computation, GraphX exposes a set of fundamental operators (e.g., subgraph, joinVertices, and aggregateMessages) as well as an optimized variant of the Pregel API. In addition, GraphX includes a growing collection of graph algorithms and builders to simplify graph analytics tasks.

Migrating from Spark 1.1

GraphX in Spark 1.2 contains a few user facing API changes:

  1. To improve performance we have introduced a new version of mapReduceTriplets called aggregateMessages which takes the messages previously returned from mapReduceTriplets through a callback (EdgeContext) rather than by return value. We are deprecating mapReduceTriplets and encourage users to consult the transition guide.

  2. In Spark 1.0 and 1.1, the type signature of EdgeRDD switched from EdgeRDD[ED] to EdgeRDD[ED, VD] to enable some caching optimizations. We have since discovered a more elegant solution and have restored the type signature to the more natural EdgeRDD[ED] type.

Getting Started

To get started you first need to import Spark and GraphX into your project, as follows:

{% highlight scala %} import org.apache.spark._ import org.apache.spark.graphx._ // To make some of the examples work we will also need RDD import org.apache.spark.rdd.RDD {% endhighlight %}

If you are not using the Spark shell you will also need a SparkContext. To learn more about getting started with Spark refer to the Spark Quick Start Guide.

The Property Graph

The property graph is a directed multigraph with user defined objects attached to each vertex and edge. A directed multigraph is a directed graph with potentially multiple parallel edges sharing the same source and destination vertex. The ability to support parallel edges simplifies modeling scenarios where there can be multiple relationships (e.g., co-worker and friend) between the same vertices. Each vertex is keyed by a unique 64-bit long identifier (VertexID). GraphX does not impose any ordering constraints on the vertex identifiers. Similarly, edges have corresponding source and destination vertex identifiers.

The property graph is parameterized over the vertex (VD) and edge (ED) types. These are the types of the objects associated with each vertex and edge respectively.

GraphX optimizes the representation of vertex and edge types when they are primitive data types (e.g., int, double, etc...) reducing the in memory footprint by storing them in specialized arrays.

In some cases it may be desirable to have vertices with different property types in the same graph. This can be accomplished through inheritance. For example to model users and products as a bipartite graph we might do the following:

{% highlight scala %} class VertexProperty() case class UserProperty(val name: String) extends VertexProperty case class ProductProperty(val name: String, val price: Double) extends VertexProperty // The graph might then have the type: var graph: Graph[VertexProperty, String] = null {% endhighlight %}

Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or structure of the graph are accomplished by producing a new graph with the desired changes. Note that substantial parts of the original graph (i.e., unaffected structure, attributes, and indices) are reused in the new graph reducing the cost of this inherently functional data structure. The graph is partitioned across the executors using a range of vertex partitioning heuristics. As with RDDs, each partition of the graph can be recreated on a different machine in the event of a failure.

Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the properties for each vertex and edge. As a consequence, the graph class contains members to access the vertices and edges of the graph:

{% highlight scala %} class Graph[VD, ED] { val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] } {% endhighlight %}

The classes VertexRDD[VD] and EdgeRDD[ED] extend and are optimized versions of RDD[(VertexID, VD)] and RDD[Edge[ED]] respectively. Both VertexRDD[VD] and EdgeRDD[ED] provide additional functionality built around graph computation and leverage internal optimizations. We discuss the VertexRDD and EdgeRDD API in greater detail in the section on vertex and edge RDDs but for now they can be thought of as simply RDDs of the form: RDD[(VertexID, VD)] and RDD[Edge[ED]].

Example Property Graph

Suppose we want to construct a property graph consisting of the various collaborators on the GraphX project. The vertex property might contain the username and occupation. We could annotate edges with a string describing the relationships between collaborators:

The Property Graph

The resulting graph would have the type signature:

{% highlight scala %} val userGraph: Graph[(String, String), String] {% endhighlight %}

There are numerous ways to construct a property graph from raw files, RDDs, and even synthetic generators and these are discussed in more detail in the section on graph builders. Probably the most general method is to use the Graph object. For example the following code constructs a graph from a collection of RDDs:

{% highlight scala %} // Assume the SparkContext has already been constructed val sc: SparkContext // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) {% endhighlight %}

In the above example we make use of the Edge case class. Edges have a srcId and a dstId corresponding to the source and destination vertex identifiers. In addition, the Edge class has an attr member which stores the edge property.

We can deconstruct a graph into the respective vertex and edge views by using the graph.vertices and graph.edges members respectively.

{% highlight scala %} val graph: Graph[(String, String), String] // Constructed from above // Count all users which are postdocs graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count // Count all the edges where src > dst graph.edges.filter(e => e.srcId > e.dstId).count {% endhighlight %}

Note that graph.vertices returns an VertexRDD[(String, String)] which extends RDD[(VertexID, (String, String))] and so we use the scala case expression to deconstruct the tuple. On the other hand, graph.edges returns an EdgeRDD containing Edge[String] objects. We could have also used the case class type constructor as in the following: {% highlight scala %} graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count {% endhighlight %}

In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. The triplet view logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]] containing instances of the EdgeTriplet class. This join can be expressed in the following SQL expression:

{% highlight sql %} SELECT src.id, dst.id, src.attr, e.attr, dst.attr FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst ON e.srcId = src.Id AND e.dstId = dst.Id {% endhighlight %}

or graphically as:

Edge Triplet

The EdgeTriplet class extends the Edge class by adding the srcAttr and dstAttr members which contain the source and destination properties respectively. We can use the triplet view of a graph to render a collection of strings describing relationships between users.

{% highlight scala %} val graph: Graph[(String, String), String] // Constructed from above // Use the triplets view to create an RDD of facts. val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr.1) facts.collect.foreach(println()) {% endhighlight %}

Graph Operators

Just as RDDs have basic operations like map, filter, and reduceByKey, property graphs also have a collection of basic operators that take user defined functions and produce new graphs with transformed properties and structure. The core operators that have optimized implementations are defined in Graph and convenient operators that are expressed as a compositions of the core operators are defined in GraphOps. However, thanks to Scala implicits the operators in GraphOps are automatically available as members of Graph. For example, we can compute the in-degree of each vertex (defined in GraphOps) by the following:

{% highlight scala %} val graph: Graph[(String, String), String] // Use the implicit GraphOps.inDegrees operator val inDegrees: VertexRDD[Int] = graph.inDegrees {% endhighlight %}

The reason for differentiating between core graph operations and GraphOps is to be able to support different graph representations in the future. Each graph representation must provide implementations of the core operations and reuse many of the useful operations defined in GraphOps.

Summary List of Operators

The following is a quick summary of the functionality defined in both Graph and GraphOps but presented as members of Graph for simplicity. Note that some function signatures have been simplified (e.g., default arguments and type constraints removed) and some more advanced functionality has been removed so please consult the API docs for the official list of operations.

{% highlight scala %} /** Summary of the functionality in the property graph */ class Graph[VD, ED] { // Information about the Graph =================================================================== val numEdges: Long val numVertices: Long val inDegrees: VertexRDD[Int] val outDegrees: VertexRDD[Int] val degrees: VertexRDD[Int] // Views of the graph as collections ============================================================= val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] val triplets: RDD[EdgeTriplet[VD, ED]] // Functions for caching graphs ================================================================== def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] def cache(): Graph[VD, ED] def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] // Change the partitioning heuristic ============================================================ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] // Transform vertex and edge attributes ========================================================== def mapVertices[VD2](map:%20(VertexID,%20VD) => VD2): Graph[VD2, ED] def mapEdgesED2: Graph[VD, ED2] def mapEdges[ED2](map:%20(PartitionID,%20Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] def mapTripletsED2: Graph[VD, ED2] def mapTriplets[ED2](map:%20(PartitionID,%20Iterator[EdgeTriplet[VD,%20ED]]) => Iterator[ED2]) : Graph[VD, ED2] // Modify the graph structure ==================================================================== def reverse: Graph[VD, ED] def subgraph( epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (VertexID, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED] def maskVD2, ED2: Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] // Join RDDs with the graph ====================================================================== def joinVertices[U](table:%20RDD[(VertexID,%20U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED] def outerJoinVertices[U, VD2](other:%20RDD[(VertexID,%20U)]) (mapFunc: (VertexID, VD, Option[U]) => VD2) : Graph[VD2, ED] // Aggregate information about adjacent triplets ================================================= def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A] // Iterative graph-parallel computation ========================================================== def pregelA( vprog: (VertexID, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], mergeMsg: (A, A) => A) : Graph[VD, ED] // Basic graph algorithms ======================================================================== def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] def connectedComponents(): Graph[VertexID, ED] def triangleCount(): Graph[Int, ED] def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] } {% endhighlight %}

Property Operators

Like the RDD map operator, the property graph contains the following:

{% highlight scala %} class Graph[VD, ED] { def mapVertices[VD2](map:%20(VertexId,%20VD) => VD2): Graph[VD2, ED] def mapEdgesED2: Graph[VD, ED2] def mapTripletsED2: Graph[VD, ED2] } {% endhighlight %}

Each of these operators yields a new graph with the vertex or edge properties modified by the user defined map function.

Note that in each case the graph structure is unaffected. This is a key feature of these operators which allows the resulting graph to reuse the structural indices of the original graph. The following snippets are logically equivalent, but the first one does not preserve the structural indices and would not benefit from the GraphX system optimizations: {% highlight scala %} val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) } val newGraph = Graph(newVertices, graph.edges) {% endhighlight %} Instead, use mapVertices to preserve the indices: {% highlight scala %} val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr)) {% endhighlight %}

These operators are often used to initialize the graph for a particular computation or project away unnecessary properties. For example, given a graph with the out degrees as the vertex properties (we describe how to construct such a graph later), we initialize it for PageRank:

{% highlight scala %} // Given a graph where the vertex property is the out degree val inputGraph: Graph[Int, String] = graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0)) // Construct a graph where each edge contains the weight // and each vertex is the initial PageRank val outputGraph: Graph[Double, Double] = inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0) {% endhighlight %}

Structural Operators

Currently GraphX supports only a simple set of commonly used structural operators and we expect to add more in the future. The following is a list of the basic structural operators.

{% highlight scala %} class Graph[VD, ED] { def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def maskVD2, ED2: Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] } {% endhighlight %}

The reverse operator returns a new graph with all the edge directions reversed. This can be useful when, for example, trying to compute the inverse PageRank. Because the reverse operation does not modify vertex or edge properties or change the number of edges, it can be implemented efficiently without data movement or duplication.

The subgraph operator takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate (evaluate to true) and edges that satisfy the edge predicate and connect vertices that satisfy the vertex predicate. The subgraph operator can be used in number of situations to restrict the graph to the vertices and edges of interest or eliminate broken links. For example in the following code we remove broken links:

{% highlight scala %} // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (4L, ("peter", "student")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) // Notice that there is a user 0 (for which we have no information) connected to users // 4 (peter) and 5 (franklin). graph.triplets.map( triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr.1 ).collect.foreach(println()) // Remove missing vertices as well as the edges to connected to them val validGraph = graph.subgraph(vpred = (id, attr) => attr.2 != "Missing") // The valid subgraph will disconnect users 4 and 5 by removing user 0 validGraph.vertices.collect.foreach(println()) validGraph.triplets.map( triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr.1 ).collect.foreach(println()) {% endhighlight %}

Note in the above example only the vertex predicate is provided. The subgraph operator defaults to true if the vertex or edge predicates are not provided.

The mask operator constructs a subgraph by returning a graph that contains the vertices and edges that are also found in the input graph. This can be used in conjunction with the subgraph operator to restrict a graph based on the properties in another related graph. For example, we might run connected components using the graph with missing vertices and then restrict the answer to the valid subgraph.

{% highlight scala %} // Run Connected Components val ccGraph = graph.connectedComponents() // No longer contains missing field // Remove missing vertices as well as the edges to connected to them val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing") // Restrict the answer to the valid subgraph val validCCGraph = ccGraph.mask(validGraph) {% endhighlight %}

The groupEdges operator merges parallel edges (i.e., duplicate edges between pairs of vertices) in the multigraph. In many numerical applications, parallel edges can be added (their weights combined) into a single edge thereby reducing the size of the graph.

Join Operators

In many cases it is necessary to join data from external collections (RDDs) with graphs. For example, we might have extra user properties that we want to merge with an existing graph or we might want to pull vertex properties from one graph into another. These tasks can be accomplished using the join operators. Below we list the key join operators:

{% highlight scala %} class Graph[VD, ED] { def joinVertices[U](table:%20RDD[(VertexId,%20U)])(map: (VertexId, VD, U) => VD) : Graph[VD, ED] def outerJoinVertices[U, VD2](table:%20RDD[(VertexId,%20U)])(map: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] } {% endhighlight %}

The joinVertices operator joins the vertices with the input RDD and returns a new graph with the vertex properties obtained by applying the user defined map function to the result of the joined vertices. Vertices without a matching value in the RDD retain their original value.