Skip to content
Snippets Groups Projects
Commit 6ce7c481 authored by Zheng RuiFeng's avatar Zheng RuiFeng Committed by Reynold Xin
Browse files

[SPARK-13386][GRAPHX] ConnectedComponents should support maxIteration option

JIRA: https://issues.apache.org/jira/browse/SPARK-13386

## What changes were proposed in this pull request?

add maxIteration option for ConnectedComponents algorithm

## How was the this patch tested?

unit tests passed

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #11268 from zhengruifeng/ccwithmax.
parent 9ca79c1e
No related branches found
No related tags found
No related merge requests found
......@@ -405,14 +405,24 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
PageRank.run(graph, numIter, resetProb)
}
/**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
*/
def connectedComponents(): Graph[VertexId, ED] = {
ConnectedComponents.run(graph)
}
/**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
*/
def connectedComponents(): Graph[VertexId, ED] = {
ConnectedComponents.run(graph)
def connectedComponents(maxIterations: Int): Graph[VertexId, ED] = {
ConnectedComponents.run(graph, maxIterations)
}
/**
......
......@@ -29,13 +29,14 @@ object ConnectedComponents {
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
*
* @param graph the graph for which to compute the connected components
*
* @param maxIterations the maximum number of iterations to run for
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED],
maxIterations: Int): Graph[VertexId, ED] = {
require(maxIterations > 0)
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
if (edge.srcAttr < edge.dstAttr) {
......@@ -47,11 +48,26 @@ object ConnectedComponents {
}
}
val initialMessage = Long.MaxValue
val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
val pregelGraph = Pregel(ccGraph, initialMessage,
maxIterations, EdgeDirection.Either)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
ccGraph.unpersist()
pregelGraph
} // end of connectedComponents
/**
* Compute the connected component membership of each vertex and return a graph with the vertex
* value containing the lowest vertex id in the connected component containing that vertex.
*
* @tparam VD the vertex attribute type (discarded in the computation)
* @tparam ED the edge attribute type (preserved in the computation)
* @param graph the graph for which to compute the connected components
* @return a graph with vertex attributes containing the smallest vertex in each
* connected component
*/
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
run(graph, Int.MaxValue)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment