Skip to content
Snippets Groups Projects
Commit ce7293c1 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by gatorsmile
Browse files

[SPARK-21835][SQL][FOLLOW-UP] RewritePredicateSubquery should not produce unresolved query plans

## What changes were proposed in this pull request?

This is a follow-up of #19050 to deal with `ExistenceJoin` case.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19151 from viirya/SPARK-21835-followup.
parent aad21254
No related branches found
No related tags found
No related merge requests found
......@@ -49,12 +49,12 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
}
}
private def dedupJoin(joinPlan: Join): Join = joinPlan match {
private def dedupJoin(joinPlan: LogicalPlan): LogicalPlan = joinPlan match {
// SPARK-21835: It is possibly that the two sides of the join have conflicting attributes,
// the produced join then becomes unresolved and break structural integrity. We should
// de-duplicate conflicting attributes. We don't use transformation here because we only
// care about the most top join converted from correlated predicate subquery.
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti), joinCond) =>
case j @ Join(left, right, joinType @ (LeftSemi | LeftAnti | ExistenceJoin(_)), joinCond) =>
val duplicates = right.outputSet.intersect(left.outputSet)
if (duplicates.nonEmpty) {
val aliasMap = AttributeMap(duplicates.map { dup =>
......@@ -145,13 +145,16 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper {
e transformUp {
case Exists(sub, conditions, _) =>
val exists = AttributeReference("exists", BooleanType, nullable = false)()
newPlan = Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))
// Deduplicate conflicting attributes if any.
newPlan = dedupJoin(
Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And)))
exists
case In(value, Seq(ListQuery(sub, conditions, _, _))) =>
val exists = AttributeReference("exists", BooleanType, nullable = false)()
val inConditions = getValueExpression(value).zip(sub.output).map(EqualTo.tupled)
val newConditions = (inConditions ++ conditions).reduceLeftOption(And)
newPlan = Join(newPlan, sub, ExistenceJoin(exists), newConditions)
// Deduplicate conflicting attributes if any.
newPlan = dedupJoin(Join(newPlan, sub, ExistenceJoin(exists), newConditions))
exists
}
}
......
......@@ -938,4 +938,16 @@ class SubquerySuite extends QueryTest with SharedSQLContext {
}
}
}
test("SPARK-21835: Join in correlated subquery should be duplicateResolved: case 3") {
val sqlText =
"""
|SELECT * FROM l, r WHERE l.a = r.c + 1 AND
|(EXISTS (SELECT * FROM r) OR l.a = r.c)
""".stripMargin
val optimizedPlan = sql(sqlText).queryExecution.optimizedPlan
val join = optimizedPlan.collectFirst { case j: Join => j }.get
assert(join.duplicateResolved)
assert(optimizedPlan.resolved)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment