From b6d348eea1ac51fe8081f4cc5969a85a1ae7a05b Mon Sep 17 00:00:00 2001
From: Herman van Hovell <hvanhovell@databricks.com>
Date: Sun, 26 Mar 2017 22:47:31 +0200
Subject: [PATCH] [SPARK-20086][SQL] CollapseWindow should not collapse
 dependent adjacent windows

## What changes were proposed in this pull request?
The `CollapseWindow` is currently to aggressive when collapsing adjacent windows. It also collapses windows in the which the parent produces a column that is consumed by the child; this creates an invalid window which will fail at runtime.

This PR fixes this by adding a check for dependent adjacent windows to the `CollapseWindow` rule.

## How was this patch tested?
Added a new test case to `CollapseWindowSuite`

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #17432 from hvanhovell/SPARK-20086.

(cherry picked from commit 617ab6445ea33d8297f0691723fd19bae19228dc)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala      |  8 +++++---
 .../sql/catalyst/optimizer/CollapseWindowSuite.scala  | 11 +++++++++++
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index de37320610..1ca4dba0b0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -592,12 +592,14 @@ object CollapseRepartition extends Rule[LogicalPlan] {
 
 /**
  * Collapse Adjacent Window Expression.
- * - If the partition specs and order specs are the same, collapse into the parent.
+ * - If the partition specs and order specs are the same and the window expression are
+ *   independent, collapse into the parent.
  */
 object CollapseWindow extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-    case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 =>
-      w.copy(windowExpressions = we2 ++ we1, child = grandChild)
+    case w1 @ Window(we1, ps1, os1, w2 @ Window(we2, ps2, os2, grandChild))
+        if ps1 == ps2 && os1 == os2 && w1.references.intersect(w2.windowOutputSet).isEmpty =>
+      w1.copy(windowExpressions = we2 ++ we1, child = grandChild)
   }
 }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
index 3f7d1d9fd9..52054c2f8b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -78,4 +78,15 @@ class CollapseWindowSuite extends PlanTest {
 
     comparePlans(optimized2, correctAnswer2)
   }
+
+  test("Don't collapse adjacent windows with dependent columns") {
+    val query = testRelation
+      .window(Seq(sum(a).as('sum_a)), partitionSpec1, orderSpec1)
+      .window(Seq(max('sum_a).as('max_sum_a)), partitionSpec1, orderSpec1)
+      .analyze
+
+    val expected = query.analyze
+    val optimized = Optimize.execute(query.analyze)
+    comparePlans(optimized, expected)
+  }
 }
-- 
GitLab