diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala old mode 100644 new mode 100755 index 1fa92b019541050a097395cb891dd158a8abf802..e4f80ffcb451b59493558e7cfbe0f7259b849fa8 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala @@ -44,6 +44,9 @@ object StronglyConnectedComponents { // graph we are going to work with in our iterations var sccWorkGraph = graph.mapVertices { case (vid, _) => (vid, false) }.cache() + // helper variables to unpersist cached graphs + var prevSccGraph = sccGraph + var numVertices = sccWorkGraph.numVertices var iter = 0 while (sccWorkGraph.numVertices > 0 && iter < numIter) { @@ -64,48 +67,59 @@ object StronglyConnectedComponents { // write values to sccGraph sccGraph = sccGraph.outerJoinVertices(finalVertices) { (vid, scc, opt) => opt.getOrElse(scc) - } + }.cache() + // materialize vertices and edges + sccGraph.vertices.count() + sccGraph.edges.count() + // sccGraph materialized so, unpersist can be done on previous + prevSccGraph.unpersist(blocking = false) + prevSccGraph = sccGraph + // only keep vertices that are not final sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2).cache() } while (sccWorkGraph.numVertices < numVertices) - sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) } + // if iter < numIter at this point sccGraph that is returned + // will not be recomputed and pregel executions are pointless + if (iter < numIter) { + sccWorkGraph = sccWorkGraph.mapVertices { case (vid, (color, isFinal)) => (vid, isFinal) } - // collect min of all my neighbor's scc values, update if it's smaller than mine - // then notify any neighbors with scc values larger than mine - sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId]( - sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)( - (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2), - e => { - if (e.srcAttr._1 < e.dstAttr._1) { - Iterator((e.dstId, e.srcAttr._1)) - } else { - Iterator() - } - }, - (vid1, vid2) => math.min(vid1, vid2)) + // collect min of all my neighbor's scc values, update if it's smaller than mine + // then notify any neighbors with scc values larger than mine + sccWorkGraph = Pregel[(VertexId, Boolean), ED, VertexId]( + sccWorkGraph, Long.MaxValue, activeDirection = EdgeDirection.Out)( + (vid, myScc, neighborScc) => (math.min(myScc._1, neighborScc), myScc._2), + e => { + if (e.srcAttr._1 < e.dstAttr._1) { + Iterator((e.dstId, e.srcAttr._1)) + } else { + Iterator() + } + }, + (vid1, vid2) => math.min(vid1, vid2)) - // start at root of SCCs. Traverse values in reverse, notify all my neighbors - // do not propagate if colors do not match! - sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean]( - sccWorkGraph, false, activeDirection = EdgeDirection.In)( - // vertex is final if it is the root of a color - // or it has the same color as a neighbor that is final - (vid, myScc, existsSameColorFinalNeighbor) => { - val isColorRoot = vid == myScc._1 - (myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor) - }, - // activate neighbor if they are not final, you are, and you have the same color - e => { - val sameColor = e.dstAttr._1 == e.srcAttr._1 - val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2 - if (sameColor && onlyDstIsFinal) { - Iterator((e.srcId, e.dstAttr._2)) - } else { - Iterator() - } - }, - (final1, final2) => final1 || final2) + // start at root of SCCs. Traverse values in reverse, notify all my neighbors + // do not propagate if colors do not match! + sccWorkGraph = Pregel[(VertexId, Boolean), ED, Boolean]( + sccWorkGraph, false, activeDirection = EdgeDirection.In)( + // vertex is final if it is the root of a color + // or it has the same color as a neighbor that is final + (vid, myScc, existsSameColorFinalNeighbor) => { + val isColorRoot = vid == myScc._1 + (myScc._1, myScc._2 || isColorRoot || existsSameColorFinalNeighbor) + }, + // activate neighbor if they are not final, you are, and you have the same color + e => { + val sameColor = e.dstAttr._1 == e.srcAttr._1 + val onlyDstIsFinal = e.dstAttr._2 && !e.srcAttr._2 + if (sameColor && onlyDstIsFinal) { + Iterator((e.srcId, e.dstAttr._2)) + } else { + Iterator() + } + }, + (final1, final2) => final1 || final2) + } } sccGraph }