Skip to content
Snippets Groups Projects
Commit 617ab644 authored by Herman van Hovell's avatar Herman van Hovell
Browse files

[SPARK-20086][SQL] CollapseWindow should not collapse dependent adjacent windows

## What changes were proposed in this pull request?
The `CollapseWindow` is currently to aggressive when collapsing adjacent windows. It also collapses windows in the which the parent produces a column that is consumed by the child; this creates an invalid window which will fail at runtime.

This PR fixes this by adding a check for dependent adjacent windows to the `CollapseWindow` rule.

## How was this patch tested?
Added a new test case to `CollapseWindowSuite`

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17432 from hvanhovell/SPARK-20086.
parent 362ee932
No related branches found
No related tags found
No related merge requests found
......@@ -597,12 +597,14 @@ object CollapseRepartition extends Rule[LogicalPlan] {
/**
* Collapse Adjacent Window Expression.
* - If the partition specs and order specs are the same, collapse into the parent.
* - If the partition specs and order specs are the same and the window expression are
* independent, collapse into the parent.
*/
object CollapseWindow extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 =>
w.copy(windowExpressions = we2 ++ we1, child = grandChild)
case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty =>
w1.copy(windowExpressions = we2 ++ we1, child = grandChild)
}
}
......
......@@ -78,4 +78,15 @@ class CollapseWindowSuite extends PlanTest {
comparePlans(optimized2, correctAnswer2)
}
test("Don't collapse adjacent windows with dependent columns") {
val query = testRelation
.window(Seq(sum(a).as('sum_a)), partitionSpec1, orderSpec1)
.window(Seq(max('sum_a).as('max_sum_a)), partitionSpec1, orderSpec1)
.analyze
val expected = query.analyze
val optimized = Optimize.execute(query.analyze)
comparePlans(optimized, expected)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment