diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 087270e46dd90e4fe84317789e1da3c774d50854..b3698ffa44d57f192c052e03d5c6551c1b9a2fdf 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -307,16 +307,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] JavaPairRDD.fromRDD(rdd.keyBy(f)) } - - /** - * Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir` - * (set using setCheckpointDir()) and all references to its parent RDDs will be removed. - * This is used to truncate very long lineages. In the current implementation, Spark will save - * this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done. - * Hence, it is strongly recommended to use checkpoint() on RDDs when - * (i) checkpoint() is called before the any job has been executed on this RDD. - * (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will - * require recomputation. + + /** + * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint + * directory set with SparkContext.setCheckpointDir() and all references to its parent + * RDDs will be removed. This function must be called before any job has been + * executed on this RDD. It is strongly recommended that this RDD is persisted in + * memory, otherwise saving it on a file will require recomputation. */ def checkpoint() = rdd.checkpoint() diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index fa2f14113d7c8e638feb2eccfa2ea2af609130f7..14699961adbaa1a3ddb4da92f41a408b56b6b6fc 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -357,20 +357,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork } /** - * Set the directory under which RDDs are going to be checkpointed. This method will - * create this directory and will throw an exception of the path already exists (to avoid - * overwriting existing files may be overwritten). The directory will be deleted on exit - * if indicated. + * Set the directory under which RDDs are going to be checkpointed. The directory must + * be a HDFS path if running on a cluster. If the directory does not exist, it will + * be created. If the directory exists and useExisting is set to true, then the + * exisiting directory will be used. Otherwise an exception will be thrown to + * prevent accidental overriding of checkpoint files in the existing directory. */ def setCheckpointDir(dir: String, useExisting: Boolean) { sc.setCheckpointDir(dir, useExisting) } /** - * Set the directory under which RDDs are going to be checkpointed. This method will - * create this directory and will throw an exception of the path already exists (to avoid - * overwriting existing files may be overwritten). The directory will be deleted on exit - * if indicated. + * Set the directory under which RDDs are going to be checkpointed. The directory must + * be a HDFS path if running on a cluster. If the directory does not exist, it will + * be created. If the directory exists, an exception will be thrown to prevent accidental + * overriding of checkpoint files. */ def setCheckpointDir(dir: String) { sc.setCheckpointDir(dir) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 8beb8e2ae91a4ea71b9695799c63d44c770e2386..dcbed37270fc319b6d3aa39c2e77b0cff5cc7df8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -202,9 +202,12 @@ class SparkContext(object): def setCheckpointDir(self, dirName, useExisting=False): """ - Set the directory under which RDDs are going to be checkpointed. This - method will create this directory and will throw an exception of the - path already exists (to avoid overwriting existing files may be - overwritten). The directory will be deleted on exit if indicated. + Set the directory under which RDDs are going to be checkpointed. The + directory must be a HDFS path if running on a cluster. + + If the directory does not exist, it will be created. If the directory + exists and C{useExisting} is set to true, then the exisiting directory + will be used. Otherwise an exception will be thrown to prevent + accidental overriding of checkpoint files in the existing directory. """ self._jsc.sc().setCheckpointDir(dirName, useExisting) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 2a2ff9b271d7d7930c97b841cd7acc0c103159d6..7b6ab956ee699e4199aa1006c0f3c195cff7f8db 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -52,18 +52,11 @@ class RDD(object): def checkpoint(self): """ - Mark this RDD for checkpointing. The RDD will be saved to a file inside - `checkpointDir` (set using setCheckpointDir()) and all references to - its parent RDDs will be removed. This is used to truncate very long - lineages. In the current implementation, Spark will save this RDD to - a file (using saveAsObjectFile()) after the first job using this RDD is - done. Hence, it is strongly recommended to use checkpoint() on RDDs - when - - (i) checkpoint() is called before the any job has been executed on this - RDD. - - (ii) This RDD has been made to persist in memory. Otherwise saving it + Mark this RDD for checkpointing. It will be saved to a file inside the + checkpoint directory set with L{SparkContext.setCheckpointDir()} and + all references to its parent RDDs will be removed. This function must + be called before any job has been executed on this RDD. It is strongly + recommended that this RDD is persisted in memory, otherwise saving it on a file will require recomputation. """ self.is_checkpointed = True