From 1e5535cfcf3717f76e990a3800b07a05ddafe2e2 Mon Sep 17 00:00:00 2001 From: Dan Crankshaw <dscrankshaw@gmail.com> Date: Fri, 11 Oct 2013 16:38:52 -0700 Subject: [PATCH] Added connected components back --- .../org/apache/spark/graph/Analytics.scala | 67 ++++++++++--------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index b8c2f5186e..59af021c8d 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -64,7 +64,7 @@ object Analytics extends Logging { * lowest vertex id in the connected component containing * that vertex. */ - def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED]) = { + def connectedComponents[VD: Manifest, ED: Manifest](graph: Graph[VD, ED], numIter: Int) = { val ccGraph = graph.mapVertices { case Vertex(vid, _) => vid } GraphLab.iterate(ccGraph)( @@ -72,6 +72,7 @@ object Analytics extends Logging { (a: Vid, b: Vid) => math.min(a, b), // merge (v, a: Option[Vid]) => math.min(v.data, a.getOrElse(Long.MaxValue)), // apply (me_id, edge) => (edge.vertex(me_id).data < edge.otherVertex(me_id).data), // scatter + numIter, gatherDirection = EdgeDirection.Both, scatterDirection = EdgeDirection.Both ) } @@ -157,38 +158,38 @@ object Analytics extends Logging { sc.stop() } -// case "cc" => { -// -// var numIter = Int.MaxValue -// var isDynamic = false -// -// options.foreach{ -// case ("numIter", v) => numIter = v.toInt -// case ("dynamic", v) => isDynamic = v.toBoolean -// case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) -// } -// -// if(!isDynamic && numIter == Int.MaxValue) { -// println("Set number of iterations!") -// sys.exit(1) -// } -// println("======================================") -// println("| Connected Components |") -// println("--------------------------------------") -// println(" Using parameters:") -// println(" \tDynamic: " + isDynamic) -// println(" \tNumIter: " + numIter) -// println("======================================") -// -// val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") -// val graph = GraphLoader.textFile(sc, fname, a => 1.0F) -// //val cc = Analytics.connectedComponents(graph, numIter) -// // val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) -// // else Analytics.connectedComponents(graph, numIter) -// println("Components: " + cc.vertices.map(_.data).distinct()) -// -// sc.stop() -// } + case "cc" => { + + var numIter = Int.MaxValue + var isDynamic = false + + options.foreach{ + case ("numIter", v) => numIter = v.toInt + case ("dynamic", v) => isDynamic = v.toBoolean + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + if(!isDynamic && numIter == Int.MaxValue) { + println("Set number of iterations!") + sys.exit(1) + } + println("======================================") + println("| Connected Components |") + println("--------------------------------------") + println(" Using parameters:") + println(" \tDynamic: " + isDynamic) + println(" \tNumIter: " + numIter) + println("======================================") + + val sc = new SparkContext(host, "ConnectedComponents(" + fname + ")") + val graph = GraphLoader.textFile(sc, fname, a => 1.0F) + val cc = Analytics.connectedComponents(graph, numIter) + //val cc = if(isDynamic) Analytics.dynamicConnectedComponents(graph, numIter) + // else Analytics.connectedComponents(graph, numIter) + println("Components: " + cc.vertices.map(_.data).distinct()) + + sc.stop() + } // // case "shortestpath" => { // -- GitLab