Skip to content
Snippets Groups Projects
Commit 36006352 authored by Nattavut Sutyanyong's avatar Nattavut Sutyanyong Committed by Herman van Hovell
Browse files

[SPARK-18614][SQL] Incorrect predicate pushdown from ExistenceJoin

## What changes were proposed in this pull request?

ExistenceJoin should be treated the same as LeftOuter and LeftAnti, not InnerLike and LeftSemi. This is not currently exposed because the rewrite of [NOT] EXISTS OR ... to ExistenceJoin happens in rule RewritePredicateSubquery, which is in a separate rule set and placed after the rule PushPredicateThroughJoin. During the transformation in the rule PushPredicateThroughJoin, an ExistenceJoin never exists.

The semantics of ExistenceJoin says we need to preserve all the rows from the left table through the join operation as if it is a regular LeftOuter join. The ExistenceJoin augments the LeftOuter operation with a new column called exists, set to true when the join condition in the ON clause is true and false otherwise. The filter of any rows will happen in the Filter operation above the ExistenceJoin.

Example:

A(c1, c2): { (1, 1), (1, 2) }
// B can be any value as it is irrelevant in this example
B(c1): { (NULL) }

select A.*
from   A
where  exists (select 1 from B where A.c1 = A.c2)
       or A.c2=2

In this example, the correct result is all the rows from A. If the pattern ExistenceJoin around line 935 in Optimizer.scala is indeed active, the code will push down the predicate A.c1 = A.c2 to be a Filter on relation A, which will incorrectly filter the row (1,2) from A.

## How was this patch tested?

Since this is not an exposed case, no new test cases is added. The scenario is discovered via a code review of another PR and confirmed to be valid with peer.

Author: Nattavut Sutyanyong <nsy.can@gmail.com>

Closes #16044 from nsyca/spark-18614.
parent f8878a4c
No related branches found
No related tags found
No related merge requests found
...@@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { ...@@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
joinType match { joinType match {
case _: InnerLike | LeftSemi | ExistenceJoin(_) => case _: InnerLike | LeftSemi =>
// push down the single side only join filter for both sides sub queries // push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions. val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
...@@ -949,7 +949,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { ...@@ -949,7 +949,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And) val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
Join(newLeft, newRight, RightOuter, newJoinCond) Join(newLeft, newRight, RightOuter, newJoinCond)
case LeftOuter | LeftAnti => case LeftOuter | LeftAnti | ExistenceJoin(_) =>
// push down the right side only join filter for right sub query // push down the right side only join filter for right sub query
val newLeft = left val newLeft = left
val newRight = rightJoinConditions. val newRight = rightJoinConditions.
......
...@@ -546,6 +546,23 @@ class FilterPushdownSuite extends PlanTest { ...@@ -546,6 +546,23 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
} }
test("joins: only push down join conditions to the right of an existence join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val fillerVal = 'val.boolean
val originalQuery =
x.join(y,
ExistenceJoin(fillerVal),
Some("x.a".attr > 1 && "y.b".attr > 2)).analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer =
x.join(
y.where("y.b".attr > 2),
ExistenceJoin(fillerVal),
Some("x.a".attr > 1))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}
val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
......
-- SPARK-18597: Do not push down predicates to left hand side in an anti-join
CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2); CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2);
CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1); CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1);
-- SPARK-18597: Do not push down predicates to left hand side in an anti-join
SELECT * SELECT *
FROM tbl_a FROM tbl_a
LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2); LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2);
-- SPARK-18614: Do not push down predicates on left table below ExistenceJoin
SELECT l.c1, l.c2
FROM tbl_a l
WHERE EXISTS (SELECT 1 FROM tbl_b r WHERE l.c1 = l.c2) OR l.c2 < 2;
-- Automatically generated by SQLQueryTestSuite -- Automatically generated by SQLQueryTestSuite
-- Number of queries: 3 -- Number of queries: 4
-- !query 0 -- !query 0
...@@ -27,3 +27,14 @@ struct<c1:int,c2:int> ...@@ -27,3 +27,14 @@ struct<c1:int,c2:int>
-- !query 2 output -- !query 2 output
2 1 2 1
3 6 3 6
-- !query 3
SELECT l.c1, l.c2
FROM tbl_a l
WHERE EXISTS (SELECT 1 FROM tbl_b r WHERE l.c1 = l.c2) OR l.c2 < 2
-- !query 3 schema
struct<c1:int,c2:int>
-- !query 3 output
1 1
2 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment