-
- Downloads
[SPARK-21153] Use project instead of expand in tumbling windows
## What changes were proposed in this pull request? Time windowing in Spark currently performs an Expand + Filter, because there is no way to guarantee the amount of windows a timestamp will fall in, in the general case. However, for tumbling windows, a record is guaranteed to fall into a single bucket. In this case, doubling the number of records with Expand is wasteful, and can be improved by using a simple Projection instead. Benchmarks show that we get an order of magnitude performance improvement after this patch. ## How was this patch tested? Existing unit tests. Benchmarked using the following code: ```scala import org.apache.spark.sql.functions._ spark.time { spark.range(numRecords) .select(from_unixtime((current_timestamp().cast("long") * 1000 + 'id / 1000) / 1000) as 'time) .select(window('time, "10 seconds")) .count() } ``` Setup: - 1 c3.2xlarge worker (8 cores)  1 B rows ran in 287 seconds after this optimization. I didn't wait for it to finish without the optimization. Shows about 5x improvement for large number of records. Author: Burak Yavuz <brkyvz@gmail.com> Closes #18364 from brkyvz/opt-tumble.
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala 50 additions, 22 deletions...ala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala 8 additions, 4 deletions...rg/apache/spark/sql/catalyst/expressions/TimeWindow.scala
- sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala 36 additions, 13 deletions...la/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala
Loading
Please register or sign in to comment