From 406f33987ac078fb20d2f5e81b7e1f646ea53fed Mon Sep 17 00:00:00 2001 From: Gabriel Huang <gabi.xiaohuang@gmail.com> Date: Mon, 21 Nov 2016 16:08:34 -0500 Subject: [PATCH] [SPARK-18361][PYSPARK] Expose RDD localCheckpoint in PySpark ## What changes were proposed in this pull request? Expose RDD's localCheckpoint() and associated functions in PySpark. ## How was this patch tested? I added a UnitTest in python/pyspark/tests.py which passes. I certify that this is my original work, and I license it to the project under the project's open source license. Gabriel HUANG Developer at Cardabel (http://cardabel.com/) Author: Gabriel Huang <gabi.xiaohuang@gmail.com> Closes #15811 from gabrielhuang/pyspark-localcheckpoint. --- python/pyspark/rdd.py | 33 ++++++++++++++++++++++++++++++++- python/pyspark/tests.py | 17 +++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 641787ee20..f21a364df9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -263,13 +263,44 @@ class RDD(object): def isCheckpointed(self): """ - Return whether this RDD has been checkpointed or not + Return whether this RDD is checkpointed and materialized, either reliably or locally. """ return self._jrdd.rdd().isCheckpointed() + def localCheckpoint(self): + """ + Mark this RDD for local checkpointing using Spark's existing caching layer. + + This method is for users who wish to truncate RDD lineages while skipping the expensive + step of replicating the materialized data in a reliable distributed file system. This is + useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX). + + Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed + data is written to ephemeral local storage in the executors instead of to a reliable, + fault-tolerant storage. The effect is that if an executor fails during the computation, + the checkpointed data may no longer be accessible, causing an irrecoverable job failure. + + This is NOT safe to use with dynamic allocation, which removes executors along + with their cached blocks. If you must use both features, you are advised to set + L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value. + + The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used. + """ + self._jrdd.rdd().localCheckpoint() + + def isLocallyCheckpointed(self): + """ + Return whether this RDD is marked for local checkpointing. + + Exposed for testing. + """ + return self._jrdd.rdd().isLocallyCheckpointed() + def getCheckpointFile(self): """ Gets the name of the file to which this RDD was checkpointed + + Not defined if RDD is checkpointed locally. """ checkpointFile = self._jrdd.rdd().getCheckpointFile() if checkpointFile.isDefined(): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 3e0bd16d85..ab4bef8329 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -390,6 +390,23 @@ class CheckpointTests(ReusedPySparkTestCase): self.assertEqual([1, 2, 3, 4], recovered.collect()) +class LocalCheckpointTests(ReusedPySparkTestCase): + + def test_basic_localcheckpointing(self): + parCollection = self.sc.parallelize([1, 2, 3, 4]) + flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1)) + + self.assertFalse(flatMappedRDD.isCheckpointed()) + self.assertFalse(flatMappedRDD.isLocallyCheckpointed()) + + flatMappedRDD.localCheckpoint() + result = flatMappedRDD.collect() + time.sleep(1) # 1 second + self.assertTrue(flatMappedRDD.isCheckpointed()) + self.assertTrue(flatMappedRDD.isLocallyCheckpointed()) + self.assertEqual(flatMappedRDD.collect(), result) + + class AddFileTests(PySparkTestCase): def test_add_py_file(self): -- GitLab