From e51978c3deaa91ae8115c8f2db1af692622a1616 Mon Sep 17 00:00:00 2001 From: Herman van Hovell <hvanhovell@databricks.com> Date: Fri, 4 Nov 2016 21:18:13 +0100 Subject: [PATCH] [SPARK-17337][SQL] Do not pushdown predicates through filters with predicate subqueries ## What changes were proposed in this pull request? The `PushDownPredicate` rule can create a wrong result if we try to push a filter containing a predicate subquery through a project when the subquery and the project share attributes (have the same source). The current PR fixes this by making sure that we do not push down when there is a predicate subquery that outputs the same attributes as the filters new child plan. ## How was this patch tested? Added a test to `SubquerySuite`. nsyca has done previous work this. I have taken test from his initial PR. Author: Herman van Hovell <hvanhovell@databricks.com> Closes #15761 from hvanhovell/SPARK-17337. (cherry picked from commit 550cd56e8b6addb26efe3ce16976c9c34fa0c832) Signed-off-by: Herman van Hovell <hvanhovell@databricks.com> --- .../sql/catalyst/optimizer/Optimizer.scala | 16 ++++++++++++- .../org/apache/spark/sql/SubquerySuite.scala | 24 +++++++++++++++---- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index b6ad5db74e..6ba8b33b3f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -689,7 +689,7 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { // state and all the input rows processed before. In another word, the order of input rows // matters for non-deterministic expressions, while pushing down predicates changes the order. case filter @ Filter(condition, project @ Project(fields, grandChild)) - if fields.forall(_.deterministic) => + if fields.forall(_.deterministic) && canPushThroughCondition(grandChild, condition) => // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT a + b AS c, d ...' produces Map(c -> a + b). @@ -830,6 +830,20 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { filter } } + + /** + * Check if we can safely push a filter through a projection, by making sure that predicate + * subqueries in the condition do not contain the same attributes as the plan they are moved + * into. This can happen when the plan and predicate subquery have the same source. + */ + private def canPushThroughCondition(plan: LogicalPlan, condition: Expression): Boolean = { + val attributes = plan.outputSet + val matched = condition.find { + case PredicateSubquery(p, _, _, _) => p.outputSet.intersect(attributes).nonEmpty + case _ => false + } + matched.isEmpty + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index eab45050f7..8934866834 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -608,8 +608,8 @@ class SubquerySuite extends QueryTest with SharedSQLContext { | where exists (select 1 from onerow t2 where t1.c1=t2.c1) | and exists (select 1 from onerow LIMIT 1)""".stripMargin), Row(1) :: Nil) - } - } + } + } test("SPARK-16804: Correlated subqueries containing LIMIT - 2") { withTempView("onerow") { @@ -623,6 +623,22 @@ class SubquerySuite extends QueryTest with SharedSQLContext { | from (select 1 from onerow t2 LIMIT 1) | where t1.c1=t2.c1)""".stripMargin), Row(1) :: Nil) - } - } + } + } + + test("SPARK-17337: Incorrect column resolution leads to incorrect results") { + withTempView("t1", "t2") { + Seq(1, 2).toDF("c1").createOrReplaceTempView("t1") + Seq(1).toDF("c2").createOrReplaceTempView("t2") + + checkAnswer( + sql( + """ + | select * + | from (select t2.c2+1 as c3 + | from t1 left join t2 on t1.c1=t2.c2) t3 + | where c3 not in (select c2 from t2)""".stripMargin), + Row(2) :: Nil) + } + } } -- GitLab