-
- Downloads
[SPARK-14160] Time Windowing functions for Datasets
## What changes were proposed in this pull request? This PR adds the function `window` as a column expression. `window` can be used to bucket rows into time windows given a time column. With this expression, performing time series analysis on batch data, as well as streaming data should become much more simpler. ### Usage Assume the following schema: `sensor_id, measurement, timestamp` To average 5 minute data every 1 minute (window length of 5 minutes, slide duration of 1 minute), we will use: ```scala df.groupBy(window("timestamp", “5 minutes”, “1 minute”), "sensor_id") .agg(mean("measurement").as("avg_meas")) ``` This will generate windows such as: ``` 09:00:00-09:05:00 09:01:00-09:06:00 09:02:00-09:07:00 ... ``` Intervals will start at every `slideDuration` starting at the unix epoch (1970-01-01 00:00:00 UTC). To start intervals at a different point of time, e.g. 30 seconds after a minute, the `startTime` parameter can be used. ```scala df.groupBy(window("timestamp", “5 minutes”, “1 minute”, "30 second"), "sensor_id") .agg(mean("measurement").as("avg_meas")) ``` This will generate windows such as: ``` 09:00:30-09:05:30 09:01:30-09:06:30 09:02:30-09:07:30 ... ``` Support for Python will be made in a follow up PR after this. ## How was this patch tested? This patch has some basic unit tests for the `TimeWindow` expression testing that the parameters pass validation, and it also has some unit/integration tests testing the correctness of the windowing and usability in complex operations (multi-column grouping, multi-column projections, joins). Author: Burak Yavuz <brkyvz@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #12008 from brkyvz/df-time-window.
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 90 additions, 0 deletions...ala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala 1 addition, 0 deletions...apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala 133 additions, 0 deletions...rg/apache/spark/sql/catalyst/expressions/TimeWindow.scala
- sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala 56 additions, 0 deletions...ache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
- sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala 76 additions, 0 deletions...ache/spark/sql/catalyst/expressions/TimeWindowSuite.scala
- sql/core/src/main/scala/org/apache/spark/sql/functions.scala 137 additions, 0 deletionssql/core/src/main/scala/org/apache/spark/sql/functions.scala
- sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala 242 additions, 0 deletions...la/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
Loading
Please register or sign in to comment