From 97a8239a625df455d2c439f3628a529d6d9413ca Mon Sep 17 00:00:00 2001 From: Burak Yavuz <brkyvz@gmail.com> Date: Mon, 21 Nov 2016 17:24:02 -0800 Subject: [PATCH] [SPARK-18493] Add missing python APIs: withWatermark and checkpoint to dataframe ## What changes were proposed in this pull request? This PR adds two of the newly added methods of `Dataset`s to Python: `withWatermark` and `checkpoint` ## How was this patch tested? Doc tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #15921 from brkyvz/py-watermark. --- python/pyspark/sql/dataframe.py | 57 ++++++++++++++++++- .../scala/org/apache/spark/sql/Dataset.scala | 10 +++- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 3899890083..6fe6226432 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -322,6 +322,54 @@ class DataFrame(object): def __repr__(self): return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes)) + @since(2.1) + def checkpoint(self, eager=True): + """Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the + logical plan of this DataFrame, which is especially useful in iterative algorithms where the + plan may grow exponentially. It will be saved to files inside the checkpoint + directory set with L{SparkContext.setCheckpointDir()}. + + :param eager: Whether to checkpoint this DataFrame immediately + + .. note:: Experimental + """ + jdf = self._jdf.checkpoint(eager) + return DataFrame(jdf, self.sql_ctx) + + @since(2.1) + def withWatermark(self, eventTime, delayThreshold): + """Defines an event time watermark for this :class:`DataFrame`. A watermark tracks a point + in time before which we assume no more late data is going to arrive. + + Spark will use this watermark for several purposes: + - To know when a given time window aggregation can be finalized and thus can be emitted + when using output modes that do not allow updates. + + - To minimize the amount of state that we need to keep for on-going aggregations. + + The current watermark is computed by looking at the `MAX(eventTime)` seen across + all of the partitions in the query minus a user specified `delayThreshold`. Due to the cost + of coordinating this value across partitions, the actual watermark used is only guaranteed + to be at least `delayThreshold` behind the actual event time. In some cases we may still + process records that arrive more than `delayThreshold` late. + + :param eventTime: the name of the column that contains the event time of the row. + :param delayThreshold: the minimum delay to wait to data to arrive late, relative to the + latest record that has been processed in the form of an interval + (e.g. "1 minute" or "5 hours"). + + .. note:: Experimental + + >>> sdf.select('name', sdf.time.cast('timestamp')).withWatermark('time', '10 minutes') + DataFrame[name: string, time: timestamp] + """ + if not eventTime or type(eventTime) is not str: + raise TypeError("eventTime should be provided as a string") + if not delayThreshold or type(delayThreshold) is not str: + raise TypeError("delayThreshold should be provided as a string interval") + jdf = self._jdf.withWatermark(eventTime, delayThreshold) + return DataFrame(jdf, self.sql_ctx) + @since(1.3) def count(self): """Returns the number of rows in this :class:`DataFrame`. @@ -1626,6 +1674,7 @@ def _test(): from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext, SparkSession import pyspark.sql.dataframe + from pyspark.sql.functions import from_unixtime globs = pyspark.sql.dataframe.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc @@ -1638,9 +1687,11 @@ def _test(): globs['df3'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80), - Row(name='Bob', age=5, height=None), - Row(name='Tom', age=None, height=None), - Row(name=None, age=None, height=None)]).toDF() + Row(name='Bob', age=5, height=None), + Row(name='Tom', age=None, height=None), + Row(name=None, age=None, height=None)]).toDF() + globs['sdf'] = sc.parallelize([Row(name='Tom', time=1479441846), + Row(name='Bob', time=1479442946)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.dataframe, globs=globs, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3c75a6a45e..7ba6ffce27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -485,7 +485,10 @@ class Dataset[T] private[sql]( def isStreaming: Boolean = logicalPlan.isStreaming /** - * Returns a checkpointed version of this Dataset. + * Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate + * the logical plan of this Dataset, which is especially useful in iterative algorithms where the + * plan may grow exponentially. It will be saved to files inside the checkpoint + * directory set with `SparkContext#setCheckpointDir`. * * @group basic * @since 2.1.0 @@ -495,7 +498,10 @@ class Dataset[T] private[sql]( def checkpoint(): Dataset[T] = checkpoint(eager = true) /** - * Returns a checkpointed version of this Dataset. + * Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the + * logical plan of this Dataset, which is especially useful in iterative algorithms where the + * plan may grow exponentially. It will be saved to files inside the checkpoint + * directory set with `SparkContext#setCheckpointDir`. * * @group basic * @since 2.1.0 -- GitLab