diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 7f1559d1e294c2c7b69d751ea73936e8c185259b..002ba0cf734286b7745076091819adc6d77289f8 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -84,7 +84,8 @@ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD {% endhighlight %} -If you are not using the Spark shell you will also need a Spark context. +If you are not using the Spark shell you will also need a `SparkContext`. To learn more about +getting started with Spark refer to the [Spark Quick Start Guide](quick-start.html). # The Property Graph <a name="property_graph"></a> @@ -190,7 +191,7 @@ and `graph.edges` members respectively. {% highlight scala %} val graph: Graph[(String, String), String] // Constructed from above // Count all users which are postdocs -graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc"}.count +graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count // Count all the edges where src > dst graph.edges.filter(e => e.srcId > e.dstId).count {% endhighlight %} @@ -258,8 +259,10 @@ val graph: Graph[(String, String), String] val indDegrees: VertexRDD[Int] = graph.inDegrees {% endhighlight %} -The reason for differentiating between core graph operations and GraphOps is to be able to support -various graph representations in the future. +The reason for differentiating between core graph operations and [`GraphOps`][GraphOps] is to be +able to support different graph representations in the future. Each graph representation must +provide implementations of the core operations and reuse many of the useful operations defined in +[`GraphOps`][GraphOps]. ## Property Operators @@ -334,14 +337,32 @@ interest or eliminate broken links. For example in the following code we remove [Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexID,VD)⇒Boolean):Graph[VD,ED] {% highlight scala %} -val users: RDD[(VertexId, (String, String))] -val edges: RDD[Edge[String]] +// Create an RDD for the vertices +val users: RDD[(VertexID, (String, String))] = + sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), + (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), + (4L, ("peter", "student")))) +// Create an RDD for edges +val relationships: RDD[Edge[String]] = + sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), + Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser) +// Notice that there is a user 0 (for which we have no information) connecting users +// 4 (peter) and 5 (franklin). +graph.triplets.map( + triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1 + ).collect.foreach(println(_)) // Remove missing vertices as well as the edges to connected to them val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing") +// The valid subgraph will disconnect users 4 and 5 by removing user 0 +validGraph.vertices.collect.foreach(println(_)) +validGraph.triplets.map( + triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1 + ).collect.foreach(println(_)) {% endhighlight %} > Note in the above example only the vertex predicate is provided. The `subgraph` operator defaults diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 2b3b95e2ca70ecf6799f80a434adc68c07b343c4..a0a40e2d9a29c422f2a12e9160cde7526c0416ca 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -325,8 +325,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) { * * @see [[org.apache.spark.graphx.lib.ConnectedComponents]] */ - def connectedComponents(): Graph[VertexID, ED] = { - ConnectedComponents.run(graph) + def connectedComponents(undirected: Boolean = true): Graph[VertexID, ED] = { + ConnectedComponents.run(graph, undirected) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala index 4a83e2dbb80d27a8b6ee0853dda3fa05c1ef71f6..d078d2acdbc61dc36b280aa611dc7300b6821866 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala @@ -14,26 +14,42 @@ object ConnectedComponents { * @tparam ED the edge attribute type (preserved in the computation) * * @param graph the graph for which to compute the connected components + * @param undirected compute reachability ignoring edge direction. * * @return a graph with vertex attributes containing the smallest vertex in each * connected component */ - def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = { + def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], undirected: Boolean = true): + Graph[VertexID, ED] = { val ccGraph = graph.mapVertices { case (vid, _) => vid } - - def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { - if (edge.srcAttr < edge.dstAttr) { - Iterator((edge.dstId, edge.srcAttr)) - } else if (edge.srcAttr > edge.dstAttr) { - Iterator((edge.srcId, edge.dstAttr)) - } else { - Iterator.empty + if (undirected) { + def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { + if (edge.srcAttr < edge.dstAttr) { + Iterator((edge.dstId, edge.srcAttr)) + } else if (edge.srcAttr > edge.dstAttr) { + Iterator((edge.srcId, edge.dstAttr)) + } else { + Iterator.empty + } + } + val initialMessage = Long.MaxValue + Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Both)( + vprog = (id, attr, msg) => math.min(attr, msg), + sendMsg = sendMessage, + mergeMsg = (a, b) => math.min(a, b)) + } else { + def sendMessage(edge: EdgeTriplet[VertexID, ED]) = { + if (edge.srcAttr < edge.dstAttr) { + Iterator((edge.dstId, edge.srcAttr)) + } else { + Iterator.empty + } } + val initialMessage = Long.MaxValue + Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Out)( + vprog = (id, attr, msg) => math.min(attr, msg), + sendMsg = sendMessage, + mergeMsg = (a, b) => math.min(a, b)) } - val initialMessage = Long.MaxValue - Pregel(ccGraph, initialMessage)( - vprog = (id, attr, msg) => math.min(attr, msg), - sendMsg = sendMessage, - mergeMsg = (a, b) => math.min(a, b)) } // end of connectedComponents } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index 66612b381f1938f212f5bc7326a2f31933e0efb7..86da8f1b4641e90e0c825282f2e415cd975a63d2 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -80,4 +80,34 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext { } } // end of reverse chain connected components + test("Connected Components on a Toy Connected Graph") { + withSpark { sc => + // Create an RDD for the vertices + val users: RDD[(VertexID, (String, String))] = + sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), + (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), + (4L, ("peter", "student")))) + // Create an RDD for edges + val relationships: RDD[Edge[String]] = + sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), + Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) + // Edges are: + // 2 ---> 5 ---> 3 + // | \ + // V \| + // 4 ---> 0 7 + // + // Define a default user in case there are relationship with missing user + val defaultUser = ("John Doe", "Missing") + // Build the initial Graph + val graph = Graph(users, relationships, defaultUser) + val ccGraph = graph.connectedComponents(undirected = true) + val vertices = ccGraph.vertices.collect + for ( (id, cc) <- vertices ) { + assert(cc == 0) + } + } + } // end of toy connected components + }