From ce7293c150c71a872d20beda44b12dec9deca18d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh <viirya@gmail.com> Date: Wed, 6 Sep 2017 22:15:25 -0700 Subject: [PATCH] [SPARK-21835][SQL][FOLLOW-UP] RewritePredicateSubquery should not produce unresolved query plans ## What changes were proposed in this pull request? This is a follow-up of #19050 to deal with `ExistenceJoin` case. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #19151 from viirya/SPARK-21835-followup. --- .../spark/sql/catalyst/optimizer/subquery.scala | 11 +++++++---- .../scala/org/apache/spark/sql/SubquerySuite.scala | 12 ++++++++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 7ff891516d..64b28565eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -49,12 +49,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { } } - private def dedupJoin(joinPlan: Join): Join = joinPlan match { + private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match { // SPARK-21835: It is possibly that the two sides of the join have conflicting attributes, // the produced join then becomes unresolved and break structural integrity. We should // de-duplicate conflicting attributes. We don't use transformation here because we only // care about the most top join converted from correlated predicate subquery. - case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) => + case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) => val duplicates = right.outputSet.intersect(left.outputSet) if (duplicates.nonEmpty) { val aliasMap = AttributeMap(duplicates.map { dup => @@ -145,13 +145,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { e transformUp { case Exists(sub, conditions, _) => val exists = AttributeReference("exists", BooleanType, nullable = false)() - newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)) + // Deduplicate conflicting attributes if any. + newPlan = dedupJoin( + Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))) exists case In(value, Seq(ListQuery(sub, conditions, _, _))) => val exists = AttributeReference("exists", BooleanType, nullable = false)() val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled) val newConditions = (inConditions ++ conditions).reduceLeftOption(And) - newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions) + // Deduplicate conflicting attributes if any. + newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions)) exists } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index ee6905e999..8673dc14f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -938,4 +938,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext { } } } + + test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 3") { + val sqlText = + """ + |SELECT * FROM l, r WHERE l.a = r.c + 1 AND + |(EXISTS (SELECT * FROM r) OR l.a = r.c) + """.stripMargin + val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan + val join = optimizedPlan.collectFirst { case j: Join => j }.get + assert(join.duplicateResolved) + assert(optimizedPlan.resolved) + } } -- GitLab