Skip to content
Snippets Groups Projects
Commit 80e4d98d authored by Joseph E. Gonzalez's avatar Joseph E. Gonzalez
Browse files

Improving documentation and identifying potential bug in CC calculation.

parent 8ca97739
No related branches found
No related tags found
No related merge requests found
......@@ -84,7 +84,8 @@ import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
{% endhighlight %}
If you are not using the Spark shell you will also need a Spark context.
If you are not using the Spark shell you will also need a `SparkContext`. To learn more about
getting started with Spark refer to the [Spark Quick Start Guide](quick-start.html).
# The Property Graph
<a name="property_graph"></a>
......@@ -190,7 +191,7 @@ and `graph.edges` members respectively.
{% highlight scala %}
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc"}.count
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
{% endhighlight %}
......@@ -258,8 +259,10 @@ val graph: Graph[(String, String), String]
val indDegrees: VertexRDD[Int] = graph.inDegrees
{% endhighlight %}
The reason for differentiating between core graph operations and GraphOps is to be able to support
various graph representations in the future.
The reason for differentiating between core graph operations and [`GraphOps`][GraphOps] is to be
able to support different graph representations in the future. Each graph representation must
provide implementations of the core operations and reuse many of the useful operations defined in
[`GraphOps`][GraphOps].
## Property Operators
......@@ -334,14 +337,32 @@ interest or eliminate broken links. For example in the following code we remove
[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])⇒Boolean,(VertexID,VD)⇒Boolean):Graph[VD,ED]
{% highlight scala %}
val users: RDD[(VertexId, (String, String))]
val edges: RDD[Edge[String]]
// Create an RDD for the vertices
val users: RDD[(VertexID, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connecting users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
{% endhighlight %}
> Note in the above example only the vertex predicate is provided. The `subgraph` operator defaults
......
......@@ -325,8 +325,8 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents]]
*/
def connectedComponents(): Graph[VertexID, ED] = {
ConnectedComponents.run(graph)
def connectedComponents(undirected: Boolean = true): Graph[VertexID, ED] = {
ConnectedComponents.run(graph, undirected)
}
/**
......
......@@ -14,26 +14,42 @@ object ConnectedComponents {
* @tparam ED the edge attribute type (preserved in the computation)
*
* @param graph the graph for which to compute the connected components
* @param undirected compute reachability ignoring edge direction.
*
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexID, ED] = {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], undirected: Boolean = true):
Graph[VertexID, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
Iterator((edge.srcId, edge.dstAttr))
} else {
Iterator.empty
if (undirected) {
def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
Iterator((edge.srcId, edge.dstAttr))
} else {
Iterator.empty
}
}
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Both)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
} else {
def sendMessage(edge: EdgeTriplet[VertexID, ED]) = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else {
Iterator.empty
}
}
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Out)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
}
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
} // end of connectedComponents
}
......@@ -80,4 +80,34 @@ class ConnectedComponentsSuite extends FunSuite with LocalSparkContext {
}
} // end of reverse chain connected components
test("Connected Components on a Toy Connected Graph") {
withSpark { sc =>
// Create an RDD for the vertices
val users: RDD[(VertexID, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Edges are:
// 2 ---> 5 ---> 3
// | \
// V \|
// 4 ---> 0 7
//
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
val ccGraph = graph.connectedComponents(undirected = true)
val vertices = ccGraph.vertices.collect
for ( (id, cc) <- vertices ) {
assert(cc == 0)
}
}
} // end of toy connected components
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment