diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 641787ee20e0cb2025204170c17e0dd760a30bb6..f21a364df910015e9cfba6e67003b7c40a39a13c 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -263,13 +263,44 @@ class RDD(object):
 
     def isCheckpointed(self):
         """
-        Return whether this RDD has been checkpointed or not
+        Return whether this RDD is checkpointed and materialized, either reliably or locally.
         """
         return self._jrdd.rdd().isCheckpointed()
 
+    def localCheckpoint(self):
+        """
+        Mark this RDD for local checkpointing using Spark's existing caching layer.
+
+        This method is for users who wish to truncate RDD lineages while skipping the expensive
+        step of replicating the materialized data in a reliable distributed file system. This is
+        useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
+
+        Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed
+        data is written to ephemeral local storage in the executors instead of to a reliable,
+        fault-tolerant storage. The effect is that if an executor fails during the computation,
+        the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
+
+        This is NOT safe to use with dynamic allocation, which removes executors along
+        with their cached blocks. If you must use both features, you are advised to set
+        L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
+
+        The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used.
+        """
+        self._jrdd.rdd().localCheckpoint()
+
+    def isLocallyCheckpointed(self):
+        """
+        Return whether this RDD is marked for local checkpointing.
+
+        Exposed for testing.
+        """
+        return self._jrdd.rdd().isLocallyCheckpointed()
+
     def getCheckpointFile(self):
         """
         Gets the name of the file to which this RDD was checkpointed
+
+        Not defined if RDD is checkpointed locally.
         """
         checkpointFile = self._jrdd.rdd().getCheckpointFile()
         if checkpointFile.isDefined():
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3e0bd16d85ca4f4f6be1bcebd93e8d6b2e9c9430..ab4bef8329cd00c2e9ba113eac580edc42430f2e 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -390,6 +390,23 @@ class CheckpointTests(ReusedPySparkTestCase):
         self.assertEqual([1, 2, 3, 4], recovered.collect())
 
 
+class LocalCheckpointTests(ReusedPySparkTestCase):
+
+    def test_basic_localcheckpointing(self):
+        parCollection = self.sc.parallelize([1, 2, 3, 4])
+        flatMappedRDD = parCollection.flatMap(lambda x: range(1, x + 1))
+
+        self.assertFalse(flatMappedRDD.isCheckpointed())
+        self.assertFalse(flatMappedRDD.isLocallyCheckpointed())
+
+        flatMappedRDD.localCheckpoint()
+        result = flatMappedRDD.collect()
+        time.sleep(1)  # 1 second
+        self.assertTrue(flatMappedRDD.isCheckpointed())
+        self.assertTrue(flatMappedRDD.isLocallyCheckpointed())
+        self.assertEqual(flatMappedRDD.collect(), result)
+
+
 class AddFileTests(PySparkTestCase):
 
     def test_add_py_file(self):