diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 38998900837cf4ec7815bfa2366c8c1d64b7acb5..6fe622643291e59bfd5213560373cf591128081c 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -322,6 +322,54 @@ class DataFrame(object):
     def __repr__(self):
         return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
 
+    @since(2.1)
+    def checkpoint(self, eager=True):
+        """Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
+        logical plan of this DataFrame, which is especially useful in iterative algorithms where the
+        plan may grow exponentially. It will be saved to files inside the checkpoint
+        directory set with L{SparkContext.setCheckpointDir()}.
+
+        :param eager: Whether to checkpoint this DataFrame immediately
+
+        .. note:: Experimental
+        """
+        jdf = self._jdf.checkpoint(eager)
+        return DataFrame(jdf, self.sql_ctx)
+
+    @since(2.1)
+    def withWatermark(self, eventTime, delayThreshold):
+        """Defines an event time watermark for this :class:`DataFrame`. A watermark tracks a point
+        in time before which we assume no more late data is going to arrive.
+
+        Spark will use this watermark for several purposes:
+          - To know when a given time window aggregation can be finalized and thus can be emitted
+            when using output modes that do not allow updates.
+
+          - To minimize the amount of state that we need to keep for on-going aggregations.
+
+        The current watermark is computed by looking at the `MAX(eventTime)` seen across
+        all of the partitions in the query minus a user specified `delayThreshold`.  Due to the cost
+        of coordinating this value across partitions, the actual watermark used is only guaranteed
+        to be at least `delayThreshold` behind the actual event time.  In some cases we may still
+        process records that arrive more than `delayThreshold` late.
+
+        :param eventTime: the name of the column that contains the event time of the row.
+        :param delayThreshold: the minimum delay to wait to data to arrive late, relative to the
+            latest record that has been processed in the form of an interval
+            (e.g. "1 minute" or "5 hours").
+
+        .. note:: Experimental
+
+        >>> sdf.select('name', sdf.time.cast('timestamp')).withWatermark('time', '10 minutes')
+        DataFrame[name: string, time: timestamp]
+        """
+        if not eventTime or type(eventTime) is not str:
+            raise TypeError("eventTime should be provided as a string")
+        if not delayThreshold or type(delayThreshold) is not str:
+            raise TypeError("delayThreshold should be provided as a string interval")
+        jdf = self._jdf.withWatermark(eventTime, delayThreshold)
+        return DataFrame(jdf, self.sql_ctx)
+
     @since(1.3)
     def count(self):
         """Returns the number of rows in this :class:`DataFrame`.
@@ -1626,6 +1674,7 @@ def _test():
     from pyspark.context import SparkContext
     from pyspark.sql import Row, SQLContext, SparkSession
     import pyspark.sql.dataframe
+    from pyspark.sql.functions import from_unixtime
     globs = pyspark.sql.dataframe.__dict__.copy()
     sc = SparkContext('local[4]', 'PythonTest')
     globs['sc'] = sc
@@ -1638,9 +1687,11 @@ def _test():
     globs['df3'] = sc.parallelize([Row(name='Alice', age=2),
                                    Row(name='Bob', age=5)]).toDF()
     globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80),
-                                  Row(name='Bob', age=5, height=None),
-                                  Row(name='Tom', age=None, height=None),
-                                  Row(name=None, age=None, height=None)]).toDF()
+                                   Row(name='Bob', age=5, height=None),
+                                   Row(name='Tom', age=None, height=None),
+                                   Row(name=None, age=None, height=None)]).toDF()
+    globs['sdf'] = sc.parallelize([Row(name='Tom', time=1479441846),
+                                   Row(name='Bob', time=1479442946)]).toDF()
 
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.dataframe, globs=globs,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3c75a6a45ec86bc3592984916345c3f3a61ab168..7ba6ffce278cf558968d79f6e87a5914e485ba45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -485,7 +485,10 @@ class Dataset[T] private[sql](
   def isStreaming: Boolean = logicalPlan.isStreaming
 
   /**
-   * Returns a checkpointed version of this Dataset.
+   * Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate
+   * the logical plan of this Dataset, which is especially useful in iterative algorithms where the
+   * plan may grow exponentially. It will be saved to files inside the checkpoint
+   * directory set with `SparkContext#setCheckpointDir`.
    *
    * @group basic
    * @since 2.1.0
@@ -495,7 +498,10 @@ class Dataset[T] private[sql](
   def checkpoint(): Dataset[T] = checkpoint(eager = true)
 
   /**
-   * Returns a checkpointed version of this Dataset.
+   * Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
+   * logical plan of this Dataset, which is especially useful in iterative algorithms where the
+   * plan may grow exponentially. It will be saved to files inside the checkpoint
+   * directory set with `SparkContext#setCheckpointDir`.
    *
    * @group basic
    * @since 2.1.0