Skip to content
Snippets Groups Projects
Commit d0a5c32b authored by Jason Lee's avatar Jason Lee Committed by Sean Owen
Browse files

[SPARK-12655][GRAPHX] GraphX does not unpersist RDDs

Some VertexRDD and EdgeRDD are created during the intermediate step of g.connectedComponents() but unnecessarily left cached after the method is done. The fix is to unpersist these RDDs once they are no longer in use.

A test case is added to confirm the fix for the reported bug.

Author: Jason Lee <cjlee@us.ibm.com>

Closes #10713 from jasoncl/SPARK-12655.
parent fe7246fe
No related branches found
No related tags found
No related merge requests found
......@@ -151,7 +151,7 @@ object Pregel extends Logging {
// count the iteration
i += 1
}
messages.unpersist(blocking = false)
g
} // end of apply
......
......@@ -47,9 +47,11 @@ object ConnectedComponents {
}
}
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
val pregelGraph = Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
ccGraph.unpersist()
pregelGraph
} // end of connectedComponents
}
......@@ -428,4 +428,20 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
}
}
test("unpersist graph RDD") {
withSpark { sc =>
val vert = sc.parallelize(List((1L, "a"), (2L, "b"), (3L, "c")), 1)
val edges = sc.parallelize(List(Edge[Long](1L, 2L), Edge[Long](1L, 3L)), 1)
val g0 = Graph(vert, edges)
val g = g0.partitionBy(PartitionStrategy.EdgePartition2D, 2)
val cc = g.connectedComponents()
assert(sc.getPersistentRDDs.nonEmpty)
cc.unpersist()
g.unpersist()
g0.unpersist()
vert.unpersist()
edges.unpersist()
assert(sc.getPersistentRDDs.isEmpty)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment