Skip to content
Snippets Groups Projects
Commit f62e3260 authored by nitin goyal's avatar nitin goyal Committed by Michael Armbrust
Browse files

[SPARK-11179] [SQL] Push filters through aggregate

Push conjunctive predicates though Aggregate operators when their references are a subset of the groupingExpressions.

Query plan before optimisation :-
Filter ((c#138L = 2) && (a#0 = 3))
 Aggregate [a#0], [a#0,count(b#1) AS c#138L]
  Project [a#0,b#1]
   LocalRelation [a#0,b#1,c#2]

Query plan after optimisation :-
Filter (c#138L = 2)
 Aggregate [a#0], [a#0,count(b#1) AS c#138L]
  Filter (a#0 = 3)
   Project [a#0,b#1]
    LocalRelation [a#0,b#1,c#2]

Author: nitin goyal <nitin.goyal@guavus.com>
Author: nitin.goyal <nitin.goyal@guavus.com>

Closes #9167 from nitin2goyal/master.
parent 8e82e598
No related branches found
No related tags found
No related merge requests found
......@@ -46,6 +46,7 @@ object DefaultOptimizer extends Optimizer {
PushPredicateThroughJoin,
PushPredicateThroughProject,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
ColumnPruning,
// Operator combine
ProjectCollapsing,
......@@ -674,6 +675,29 @@ object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelp
}
}
/**
* Push [[Filter]] operators through [[Aggregate]] operators. Parts of the predicate that reference
* attributes which are subset of group by attribute set of [[Aggregate]] will be pushed beneath,
* and the rest should remain above.
*/
object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHelper {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case filter @ Filter(condition,
aggregate @ Aggregate(groupingExpressions, aggregateExpressions, grandChild)) =>
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition {
conjunct => conjunct.references subsetOf AttributeSet(groupingExpressions)
}
if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val withPushdown = aggregate.copy(child = Filter(pushDownPredicate, grandChild))
stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown)
} else {
filter
}
}
}
/**
* Pushes down [[Filter]] operators where the `condition` can be
* evaluated using only the attributes of the left or right side of a join. Other
......
......@@ -40,6 +40,7 @@ class FilterPushdownSuite extends PlanTest {
BooleanSimplification,
PushPredicateThroughJoin,
PushPredicateThroughGenerate,
PushPredicateThroughAggregate,
ColumnPruning,
ProjectCollapsing) :: Nil
}
......@@ -652,4 +653,48 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, correctAnswer.analyze)
}
test("aggregate: push down filter when filter on group by expression") {
val originalQuery = testRelation
.groupBy('a)('a, Count('b) as 'c)
.select('a, 'c)
.where('a === 2)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = testRelation
.where('a === 2)
.groupBy('a)('a, Count('b) as 'c)
.analyze
comparePlans(optimized, correctAnswer)
}
test("aggregate: don't push down filter when filter not on group by expression") {
val originalQuery = testRelation
.select('a, 'b)
.groupBy('a)('a, Count('b) as 'c)
.where('c === 2L)
val optimized = Optimize.execute(originalQuery.analyze)
comparePlans(optimized, originalQuery.analyze)
}
test("aggregate: push down filters partially which are subset of group by expressions") {
val originalQuery = testRelation
.select('a, 'b)
.groupBy('a)('a, Count('b) as 'c)
.where('c === 2L && 'a === 3)
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = testRelation
.select('a, 'b)
.where('a === 3)
.groupBy('a)('a, Count('b) as 'c)
.where('c === 2L)
.analyze
comparePlans(optimized, correctAnswer)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment