From 0fb73253fc832361d5d89ba85692ae653961e104 Mon Sep 17 00:00:00 2001
From: Jose Torres <joseph-torres@databricks.com>
Date: Wed, 9 Aug 2017 12:50:04 -0700
Subject: [PATCH] [SPARK-21587][SS] Added filter pushdown through watermarks.

## What changes were proposed in this pull request?

Push filter predicates through EventTimeWatermark if they're deterministic and do not reference the watermarked attribute. (This is similar but not identical to the logic for pushing through UnaryNode.)

## How was this patch tested?
unit tests

Author: Jose Torres <joseph-torres@databricks.com>

Closes #18790 from joseph-torres/SPARK-21587.
---
 .../sql/catalyst/optimizer/Optimizer.scala    | 19 +++++++
 .../optimizer/FilterPushdownSuite.scala       | 57 +++++++++++++++++++
 2 files changed, 76 insertions(+)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d82af94dbf..a51b385399 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -867,6 +867,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
         filter
       }
 
+    case filter @ Filter(condition, watermark: EventTimeWatermark) =>
+      // We can only push deterministic predicates which don't reference the watermark attribute.
+      // We could in theory span() only on determinism and pull out deterministic predicates
+      // on the watermark separately. But it seems unnecessary and a bit confusing to not simply
+      // use the prefix as we do for nondeterminism in other cases.
+
+      val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(
+        p => p.deterministic && !p.references.contains(watermark.eventTime))
+
+      if (pushDown.nonEmpty) {
+        val pushDownPredicate = pushDown.reduceLeft(And)
+        val newWatermark = watermark.copy(child = Filter(pushDownPredicate, watermark.child))
+        // If there is no more filter to stay up, just eliminate the filter.
+        // Otherwise, create "Filter(stayUp) <- watermark <- Filter(pushDownPredicate)".
+        if (stayUp.isEmpty) newWatermark else Filter(stayUp.reduceLeft(And), newWatermark)
+      } else {
+        filter
+      }
+
     case filter @ Filter(_, u: UnaryNode)
         if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
       pushDownPredicate(filter, u.child) { predicate =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 3553d23560..582b3ead5e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.unsafe.types.CalendarInterval
 
 class FilterPushdownSuite extends PlanTest {
 
@@ -1134,4 +1135,60 @@ class FilterPushdownSuite extends PlanTest {
     comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
       checkAnalysis = false)
   }
+
+  test("watermark pushdown: no pushdown on watermark attribute") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('b, interval, testRelation)
+      .where('a === 5 && 'b === 10 && 'c === 5)
+    val correctAnswer = EventTimeWatermark(
+      'b, interval, testRelation.where('a === 5))
+      .where('b === 10 && 'c === 5)
+
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
+      checkAnalysis = false)
+  }
+
+  test("watermark pushdown: no pushdown for nondeterministic filter") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('c, interval, testRelation)
+      .where('a === 5 && 'b === Rand(10) && 'c === 5)
+    val correctAnswer = EventTimeWatermark(
+      'c, interval, testRelation.where('a === 5))
+      .where('b === Rand(10) && 'c === 5)
+
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
+      checkAnalysis = false)
+  }
+
+  test("watermark pushdown: full pushdown") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('c, interval, testRelation)
+      .where('a === 5 && 'b === 10)
+    val correctAnswer = EventTimeWatermark(
+      'c, interval, testRelation.where('a === 5 && 'b === 10))
+
+    comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
+      checkAnalysis = false)
+  }
+
+  test("watermark pushdown: empty pushdown") {
+    val interval = new CalendarInterval(2, 2000L)
+
+    // Verify that all conditions preceding the first watermark touching condition are pushed down
+    // by the optimizer and others are not.
+    val originalQuery = EventTimeWatermark('a, interval, testRelation)
+      .where('a === 5 && 'b === 10)
+
+    comparePlans(Optimize.execute(originalQuery.analyze), originalQuery.analyze,
+      checkAnalysis = false)
+  }
 }
-- 
GitLab