Skip to content
Snippets Groups Projects
Commit 0fb73253 authored by Jose Torres's avatar Jose Torres Committed by Tathagata Das
Browse files

[SPARK-21587][SS] Added filter pushdown through watermarks.

## What changes were proposed in this pull request?

Push filter predicates through EventTimeWatermark if they're deterministic and do not reference the watermarked attribute. (This is similar but not identical to the logic for pushing through UnaryNode.)

## How was this patch tested?
unit tests

Author: Jose Torres <joseph-torres@databricks.com>

Closes #18790 from joseph-torres/SPARK-21587.
parent 2d799d08
No related branches found
No related tags found
No related merge requests found
......@@ -867,6 +867,25 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
filter
}
case filter @ Filter(condition, watermark: EventTimeWatermark) =>
// We can only push deterministic predicates which don't reference the watermark attribute.
// We could in theory span() only on determinism and pull out deterministic predicates
// on the watermark separately. But it seems unnecessary and a bit confusing to not simply
// use the prefix as we do for nondeterminism in other cases.
val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(
p => p.deterministic && !p.references.contains(watermark.eventTime))
if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduceLeft(And)
val newWatermark = watermark.copy(child = Filter(pushDownPredicate, watermark.child))
// If there is no more filter to stay up, just eliminate the filter.
// Otherwise, create "Filter(stayUp) <- watermark <- Filter(pushDownPredicate)".
if (stayUp.isEmpty) newWatermark else Filter(stayUp.reduceLeft(And), newWatermark)
} else {
filter
}
case filter @ Filter(_, u: UnaryNode)
if canPushThrough(u) && u.expressions.forall(_.deterministic) =>
pushDownPredicate(filter, u.child) { predicate =>
......
......@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.unsafe.types.CalendarInterval
class FilterPushdownSuite extends PlanTest {
......@@ -1134,4 +1135,60 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
}
test("watermark pushdown: no pushdown on watermark attribute") {
val interval = new CalendarInterval(2, 2000L)
// Verify that all conditions preceding the first watermark touching condition are pushed down
// by the optimizer and others are not.
val originalQuery = EventTimeWatermark('b, interval, testRelation)
.where('a === 5 && 'b === 10 && 'c === 5)
val correctAnswer = EventTimeWatermark(
'b, interval, testRelation.where('a === 5))
.where('b === 10 && 'c === 5)
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
}
test("watermark pushdown: no pushdown for nondeterministic filter") {
val interval = new CalendarInterval(2, 2000L)
// Verify that all conditions preceding the first watermark touching condition are pushed down
// by the optimizer and others are not.
val originalQuery = EventTimeWatermark('c, interval, testRelation)
.where('a === 5 && 'b === Rand(10) && 'c === 5)
val correctAnswer = EventTimeWatermark(
'c, interval, testRelation.where('a === 5))
.where('b === Rand(10) && 'c === 5)
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
}
test("watermark pushdown: full pushdown") {
val interval = new CalendarInterval(2, 2000L)
// Verify that all conditions preceding the first watermark touching condition are pushed down
// by the optimizer and others are not.
val originalQuery = EventTimeWatermark('c, interval, testRelation)
.where('a === 5 && 'b === 10)
val correctAnswer = EventTimeWatermark(
'c, interval, testRelation.where('a === 5 && 'b === 10))
comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze,
checkAnalysis = false)
}
test("watermark pushdown: empty pushdown") {
val interval = new CalendarInterval(2, 2000L)
// Verify that all conditions preceding the first watermark touching condition are pushed down
// by the optimizer and others are not.
val originalQuery = EventTimeWatermark('a, interval, testRelation)
.where('a === 5 && 'b === 10)
comparePlans(Optimize.execute(originalQuery.analyze), originalQuery.analyze,
checkAnalysis = false)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment