-
- Downloads
[SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters...
[SPARK-18339][SPARK-18513][SQL] Don't push down current_timestamp for filters in StructuredStreaming and persist batch and watermark timestamps to offset log. ## What changes were proposed in this pull request? For the following workflow: 1. I have a column called time which is at minute level precision in a Streaming DataFrame 2. I want to perform groupBy time, count 3. Then I want my MemorySink to only have the last 30 minutes of counts and I perform this by .where('time >= current_timestamp().cast("long") - 30 * 60) what happens is that the `filter` gets pushed down before the aggregation, and the filter happens on the source data for the aggregation instead of the result of the aggregation (where I actually want to filter). I guess the main issue here is that `current_timestamp` is non-deterministic in the streaming context and shouldn't be pushed down the filter. Does this require us to store the `current_timestamp` for each trigger of the streaming job, that is something to discuss. Furthermore, we want to persist current batch timestamp and watermark timestamp to the offset log so that these values are consistent across multiple executions of the same batch. brkyvz zsxwing tdas ## How was this patch tested? A test was added to StreamingAggregationSuite ensuring the above use case is handled. The test injects a stream of time values (in seconds) to a query that runs in complete mode and only outputs the (count) aggregation results for the past 10 seconds. Author: Tyson Condie <tcondie@gmail.com> Closes #15949 from tcondie/SPARK-18339. (cherry picked from commit 3c0beea4) Signed-off-by:Tathagata Das <tathagata.das1565@gmail.com>
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala 31 additions, 2 deletions.../spark/sql/catalyst/expressions/datetimeExpressions.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala 17 additions, 2 deletions.../spark/sql/execution/streaming/IncrementalExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala 55 additions, 12 deletions...pache/spark/sql/execution/streaming/StreamExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala 2 additions, 2 deletions...apache/spark/sql/execution/streaming/StreamProgress.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 4 additions, 0 deletions...ala/org/apache/spark/sql/execution/streaming/memory.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamExecutionMetadataSuite.scala 35 additions, 0 deletions...he/spark/sql/streaming/StreamExecutionMetadataSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala 100 additions, 0 deletions...pache/spark/sql/streaming/StreamingAggregationSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala 2 additions, 2 deletions.../org/apache/spark/sql/streaming/StreamingQuerySuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala 27 additions, 13 deletions...scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
Loading
Please register or sign in to comment