Skip to content
Snippets Groups Projects
Commit 6dbe4489 authored by Burak Yavuz's avatar Burak Yavuz Committed by Shixiong Zhu
Browse files

[SPARK-18493] Add missing python APIs: withWatermark and checkpoint to dataframe


## What changes were proposed in this pull request?

This PR adds two of the newly added methods of `Dataset`s to Python:
`withWatermark` and `checkpoint`

## How was this patch tested?

Doc tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #15921 from brkyvz/py-watermark.

(cherry picked from commit 97a8239a)
Signed-off-by: default avatarShixiong Zhu <shixiong@databricks.com>
parent 2afc18be
No related branches found
No related tags found
No related merge requests found
......@@ -322,6 +322,54 @@ class DataFrame(object):
def __repr__(self):
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
@since(2.1)
def checkpoint(self, eager=True):
"""Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
logical plan of this DataFrame, which is especially useful in iterative algorithms where the
plan may grow exponentially. It will be saved to files inside the checkpoint
directory set with L{SparkContext.setCheckpointDir()}.
:param eager: Whether to checkpoint this DataFrame immediately
.. note:: Experimental
"""
jdf = self._jdf.checkpoint(eager)
return DataFrame(jdf, self.sql_ctx)
@since(2.1)
def withWatermark(self, eventTime, delayThreshold):
"""Defines an event time watermark for this :class:`DataFrame`. A watermark tracks a point
in time before which we assume no more late data is going to arrive.
Spark will use this watermark for several purposes:
- To know when a given time window aggregation can be finalized and thus can be emitted
when using output modes that do not allow updates.
- To minimize the amount of state that we need to keep for on-going aggregations.
The current watermark is computed by looking at the `MAX(eventTime)` seen across
all of the partitions in the query minus a user specified `delayThreshold`. Due to the cost
of coordinating this value across partitions, the actual watermark used is only guaranteed
to be at least `delayThreshold` behind the actual event time. In some cases we may still
process records that arrive more than `delayThreshold` late.
:param eventTime: the name of the column that contains the event time of the row.
:param delayThreshold: the minimum delay to wait to data to arrive late, relative to the
latest record that has been processed in the form of an interval
(e.g. "1 minute" or "5 hours").
.. note:: Experimental
>>> sdf.select('name', sdf.time.cast('timestamp')).withWatermark('time', '10 minutes')
DataFrame[name: string, time: timestamp]
"""
if not eventTime or type(eventTime) is not str:
raise TypeError("eventTime should be provided as a string")
if not delayThreshold or type(delayThreshold) is not str:
raise TypeError("delayThreshold should be provided as a string interval")
jdf = self._jdf.withWatermark(eventTime, delayThreshold)
return DataFrame(jdf, self.sql_ctx)
@since(1.3)
def count(self):
"""Returns the number of rows in this :class:`DataFrame`.
......@@ -1626,6 +1674,7 @@ def _test():
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession
import pyspark.sql.dataframe
from pyspark.sql.functions import from_unixtime
globs = pyspark.sql.dataframe.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
......@@ -1638,9 +1687,11 @@ def _test():
globs['df3'] = sc.parallelize([Row(name='Alice', age=2),
Row(name='Bob', age=5)]).toDF()
globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80),
Row(name='Bob', age=5, height=None),
Row(name='Tom', age=None, height=None),
Row(name=None, age=None, height=None)]).toDF()
Row(name='Bob', age=5, height=None),
Row(name='Tom', age=None, height=None),
Row(name=None, age=None, height=None)]).toDF()
globs['sdf'] = sc.parallelize([Row(name='Tom', time=1479441846),
Row(name='Bob', time=1479442946)]).toDF()
(failure_count, test_count) = doctest.testmod(
pyspark.sql.dataframe, globs=globs,
......
......@@ -485,7 +485,10 @@ class Dataset[T] private[sql](
def isStreaming: Boolean = logicalPlan.isStreaming
/**
* Returns a checkpointed version of this Dataset.
* Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate
* the logical plan of this Dataset, which is especially useful in iterative algorithms where the
* plan may grow exponentially. It will be saved to files inside the checkpoint
* directory set with `SparkContext#setCheckpointDir`.
*
* @group basic
* @since 2.1.0
......@@ -495,7 +498,10 @@ class Dataset[T] private[sql](
def checkpoint(): Dataset[T] = checkpoint(eager = true)
/**
* Returns a checkpointed version of this Dataset.
* Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
* logical plan of this Dataset, which is especially useful in iterative algorithms where the
* plan may grow exponentially. It will be saved to files inside the checkpoint
* directory set with `SparkContext#setCheckpointDir`.
*
* @group basic
* @since 2.1.0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment