diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 586bf3d4dd9769e72fdd4f50cffd0630a340fc0b..650b4eef6e4490672d98bafe49c26f86adc8affb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -86,7 +86,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { BooleanSimplification, SimplifyConditionals, RemoveDispensableExpressions, - SimplifyFilters, + PruneFilters, SimplifyCasts, SimplifyCaseConversionExpressions, EliminateSerialization) :: @@ -827,11 +827,12 @@ object CombineFilters extends Rule[LogicalPlan] { } /** - * Removes filters that can be evaluated trivially. This is done either by eliding the filter for - * cases where it will always evaluate to `true`, or substituting a dummy empty relation when the - * filter will always evaluate to `false`. + * Removes filters that can be evaluated trivially. This can be done through the following ways: + * 1) by eliding the filter for cases where it will always evaluate to `true`. + * 2) by substituting a dummy empty relation when the filter will always evaluate to `false`. + * 3) by eliminating the always-true conditions given the constraints on the child's output. */ -object SimplifyFilters extends Rule[LogicalPlan] { +object PruneFilters extends Rule[LogicalPlan] with PredicateHelper { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // If the filter condition always evaluate to true, remove the filter. case Filter(Literal(true, BooleanType), child) => child @@ -839,6 +840,21 @@ object SimplifyFilters extends Rule[LogicalPlan] { // replace the input with an empty relation. case Filter(Literal(null, _), child) => LocalRelation(child.output, data = Seq.empty) case Filter(Literal(false, BooleanType), child) => LocalRelation(child.output, data = Seq.empty) + // If any deterministic condition is guaranteed to be true given the constraints on the child's + // output, remove the condition + case f @ Filter(fc, p: LogicalPlan) => + val (prunedPredicates, remainingPredicates) = + splitConjunctivePredicates(fc).partition { cond => + cond.deterministic && p.constraints.contains(cond) + } + if (prunedPredicates.isEmpty) { + f + } else if (remainingPredicates.isEmpty) { + p + } else { + val newCond = remainingPredicates.reduce(And) + Filter(newCond, p) + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 3e52441519ae21905270c8e0574a6b53834a42f9..da43751b0a310926dbb1798a2451fdd2fd4974b1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -36,7 +36,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { NullPropagation, ConstantFolding, BooleanSimplification, - SimplifyFilters) :: Nil + PruneFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int, 'd.string) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..0ee7cf92097e158fce97744ebc3f2ed89d972bad --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +class PruneFiltersSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + EliminateSubqueryAliases) :: + Batch("Filter Pushdown and Pruning", Once, + CombineFilters, + PruneFilters, + PushPredicateThroughProject, + PushPredicateThroughJoin) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("Constraints of isNull + LeftOuter") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val query = x.where("x.b".attr.isNull).join(y, LeftOuter) + val queryWithUselessFilter = query.where("x.b".attr.isNull) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Constraints of unionall") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int) + val tr2 = LocalRelation('d.int, 'e.int, 'f.int) + val tr3 = LocalRelation('g.int, 'h.int, 'i.int) + + val query = + tr1.where('a.attr > 10) + .unionAll(tr2.where('d.attr > 10) + .unionAll(tr3.where('g.attr > 10))) + val queryWithUselessFilter = query.where('a.attr > 10) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Pruning multiple constraints in the same run") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) + val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) + + val query = tr1 + .where("tr1.a".attr > 10 || "tr1.c".attr < 10) + .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.a".attr)) + // different order of "tr2.a" and "tr1.a" + val queryWithUselessFilter = + query.where( + ("tr1.a".attr > 10 || "tr1.c".attr < 10) && + 'd.attr < 100 && + "tr2.a".attr === "tr1.a".attr) + + val optimized = Optimize.execute(queryWithUselessFilter.analyze) + val correctAnswer = query.analyze + + comparePlans(optimized, correctAnswer) + } + + test("Partial pruning") { + val tr1 = LocalRelation('a.int, 'b.int, 'c.int).subquery('tr1) + val tr2 = LocalRelation('a.int, 'd.int, 'e.int).subquery('tr2) + + // One of the filter condition does not exist in the constraints of its child + // Thus, the filter is not removed + val query = tr1 + .where("tr1.a".attr > 10) + .join(tr2.where('d.attr < 100), Inner, Some("tr1.a".attr === "tr2.d".attr)) + val queryWithExtraFilters = + query.where("tr1.a".attr > 10 && 'd.attr < 100 && "tr1.a".attr === "tr2.a".attr) + + val optimized = Optimize.execute(queryWithExtraFilters.analyze) + val correctAnswer = tr1 + .where("tr1.a".attr > 10) + .join(tr2.where('d.attr < 100), + Inner, + Some("tr1.a".attr === "tr2.a".attr && "tr1.a".attr === "tr2.d".attr)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("No predicate is pruned") { + val x = testRelation.subquery('x) + val y = testRelation.subquery('y) + + val query = x.where("x.b".attr.isNull).join(y, LeftOuter) + val queryWithExtraFilters = query.where("x.b".attr.isNotNull) + + val optimized = Optimize.execute(queryWithExtraFilters.analyze) + val correctAnswer = + testRelation.where("b".attr.isNull).where("b".attr.isNotNull) + .join(testRelation, LeftOuter).analyze + + comparePlans(optimized, correctAnswer) + } + + test("Nondeterministic predicate is not pruned") { + val originalQuery = testRelation.where(Rand(10) > 5).select('a).where(Rand(10) > 5).analyze + val optimized = Optimize.execute(originalQuery) + val correctAnswer = testRelation.where(Rand(10) > 5).where(Rand(10) > 5).select('a).analyze + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala index 50f3b512d94edb2a8903d7af8fff7cb331a8fb18..b08cdc8a3658ef55d06cce41d55ba2003d8afc0b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala @@ -32,7 +32,7 @@ class SetOperationSuite extends PlanTest { Batch("Union Pushdown", Once, CombineUnions, SetOperationPushDown, - SimplifyFilters) :: Nil + PruneFilters) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 5a5cb5cf03d4a161e8786fe4c10241d576464c81..95afdc789f322779c6dadc8519a054cf6310d731 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -234,7 +234,7 @@ private[sql] object ParquetFilters { // // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, // which can be casted to `false` implicitly. Please refer to the `eval` method of these - // operators and the `SimplifyFilters` rule for details. + // operators and the `PruneFilters` rule for details. // Hyukjin: // I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]].