-
- Downloads
[SPARK-17739][SQL] Collapse adjacent similar Window operators
## What changes were proposed in this pull request? Currently, Spark does not collapse adjacent windows with the same partitioning and sorting. This PR implements `CollapseWindow` optimizer to do the followings. 1. If the partition specs and order specs are the same, collapse into the parent. 2. If the partition specs are the same and one order spec is a prefix of the other, collapse to the more specific one. For example: ```scala val df = spark.range(1000).select($"id" % 100 as "grp", $"id", rand() as "col1", rand() as "col2") // Add summary statistics for all columns import org.apache.spark.sql.expressions.Window val cols = Seq("id", "col1", "col2") val window = Window.partitionBy($"grp").orderBy($"id") val result = cols.foldLeft(df) { (base, name) => base.withColumn(s"${name}_avg", avg(col(name)).over(window)) .withColumn(s"${name}_stddev", stddev(col(name)).over(window)) .withColumn(s"${name}_min", min(col(name)).over(window)) .withColumn(s"${name}_max", max(col(name)).over(window)) } ``` **Before** ```scala scala> result.explain == Physical Plan == Window [max(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#234], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [min(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_min#216], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [stddev_samp(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_stddev#191], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [avg(col2#19) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_avg#167], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [max(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_max#152], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [min(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_min#138], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [stddev_samp(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_stddev#117], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [avg(col1#18) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_avg#97], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [max(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_max#86L], [grp#17L], [id#14L ASC NULLS FIRST] +- Window [min(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_min#76L], [grp#17L], [id#14L ASC NULLS FIRST] +- *Project [grp#17L, id#14L, col1#18, col2#19, id_avg#26, id_stddev#42] +- Window [stddev_samp(_w0#59) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_stddev#42], [grp#17L], [id#14L ASC NULLS FIRST] +- *Project [grp#17L, id#14L, col1#18, col2#19, id_avg#26, cast(id#14L as double) AS _w0#59] +- Window [avg(id#14L) windowspecdefinition(grp#17L, id#14L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_avg#26], [grp#17L], [id#14L ASC NULLS FIRST] +- *Sort [grp#17L ASC NULLS FIRST, id#14L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(grp#17L, 200) +- *Project [(id#14L % 100) AS grp#17L, id#14L, rand(-6329949029880411066) AS col1#18, rand(-7251358484380073081) AS col2#19] +- *Range (0, 1000, step=1, splits=Some(8)) ``` **After** ```scala scala> result.explain == Physical Plan == Window [max(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_max#220, min(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_min#202, stddev_samp(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_stddev#177, avg(col2#5) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col2_avg#153, max(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_max#138, min(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_min#124, stddev_samp(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_stddev#103, avg(col1#4) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS col1_avg#83, max(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_max#72L, min(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_min#62L], [grp#3L], [id#0L ASC NULLS FIRST] +- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, id_stddev#28] +- Window [stddev_samp(_w0#45) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_stddev#28], [grp#3L], [id#0L ASC NULLS FIRST] +- *Project [grp#3L, id#0L, col1#4, col2#5, id_avg#12, cast(id#0L as double) AS _w0#45] +- Window [avg(id#0L) windowspecdefinition(grp#3L, id#0L ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS id_avg#12], [grp#3L], [id#0L ASC NULLS FIRST] +- *Sort [grp#3L ASC NULLS FIRST, id#0L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(grp#3L, 200) +- *Project [(id#0L % 100) AS grp#3L, id#0L, rand(6537478539664068821) AS col1#4, rand(-8961093871295252795) AS col2#5] +- *Range (0, 1000, step=1, splits=Some(8)) ``` ## How was this patch tested? Pass the Jenkins tests with a newly added testsuite. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #15317 from dongjoon-hyun/SPARK-17739.
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala 12 additions, 0 deletions...a/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
- sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala 78 additions, 0 deletions...he/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
Please register or sign in to comment