diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala index 94cfa7e126e0d47de67e51c459a7f6ac0e189052..7efc69c64e3c73c52b77dc5c7663740633b094eb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLab.scala @@ -110,25 +110,21 @@ object GraphLab extends Logging { activeGraph.aggregateNeighbors(gather, mergeFunc, gatherDirection) // Apply - val applied = activeGraph.outerJoinVertices(gathered)(apply) + val applied = activeGraph.outerJoinVertices(gathered)(apply).cache() // Scatter is basically a gather in the opposite direction so we reverse the edge direction val scattered: RDD[(VertexID, Boolean)] = applied.aggregateNeighbors(scatter, _ || _, scatterDirection.reverse) prevActiveGraph = activeGraph - activeGraph = activeGraph.outerJoinVertices(scattered)(applyActive).cache() + activeGraph = applied.outerJoinVertices(scattered)(applyActive).cache() - // Calculate the number of active vertices. The call to reduce() materializes the vertices of - // `activeGraph`, hiding the vertices of `prevActiveGraph`. + // Calculate the number of active vertices. numActive = activeGraph.vertices.map{ case (vid, data) => if (data._1) 1 else 0 }.reduce(_ + _) logInfo("Number active vertices: " + numActive) - // Unpersist the RDDs hidden by newly-materialized RDDs - prevActiveGraph.unpersistVertices(blocking=false) - i += 1 }