Skip to content
Snippets Groups Projects
Commit e895e0cb authored by GuoQiang Li's avatar GuoQiang Li Committed by Ankur Dave
Browse files

[SPARK-3623][GraphX] GraphX should support the checkpoint operation

Author: GuoQiang Li <witgo@qq.com>

Closes #2631 from witgo/SPARK-3623 and squashes the following commits:

a70c500 [GuoQiang Li] Remove java related
4d1e249 [GuoQiang Li] Add comments
e682724 [GuoQiang Li] Graph should support the checkpoint operation
parent 6eb1b6f6
No related branches found
No related tags found
No related merge requests found
......@@ -96,6 +96,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def cache(): Graph[VD, ED]
/**
* Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint
* directory set with SparkContext.setCheckpointDir() and all references to its parent
* RDDs will be removed. It is strongly recommended that this Graph is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit
/**
* Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative
* algorithms that modify the vertex attributes but reuse the edges. This method can be used to
......
......@@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
this
}
override def checkpoint(): Unit = {
vertices.checkpoint()
replicatedVertexView.edges.checkpoint()
}
override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
vertices.unpersist(blocking)
// TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone
......
......@@ -19,6 +19,8 @@ package org.apache.spark.graphx
import org.scalatest.FunSuite
import com.google.common.io.Files
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph._
import org.apache.spark.graphx.PartitionStrategy._
......@@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}
test("checkpoint") {
val checkpointDir = Files.createTempDir()
checkpointDir.deleteOnExit()
withSpark { sc =>
sc.setCheckpointDir(checkpointDir.getAbsolutePath)
val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) => Edge(a, b, 1)}
val rdd = sc.parallelize(ring)
val graph = Graph.fromEdges(rdd, 1.0F)
graph.checkpoint()
graph.edges.map(_.attr).count()
graph.vertices.map(_._2).count()
val edgesDependencies = graph.edges.partitionsRDD.dependencies
val verticesDependencies = graph.vertices.partitionsRDD.dependencies
assert(edgesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
assert(verticesDependencies.forall(_.rdd.isInstanceOf[CheckpointRDD[_]]))
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment