Skip to content
Snippets Groups Projects
Commit 32b259fa authored by Herman van Hovell's avatar Herman van Hovell
Browse files

[SPARK-18597][SQL] Do not push-down join conditions to the right side of a LEFT ANTI join


## What changes were proposed in this pull request?
We currently push down join conditions of a Left Anti join to both sides of the join. This is similar to Inner, Left Semi and Existence (a specialized left semi) join. The problem is that this changes the semantics of the join; a left anti join filters out rows that matches the join condition.

This PR fixes this by only pushing down conditions to the left hand side of the join. This is similar to the behavior of left outer join.

## How was this patch tested?
Added tests to `FilterPushdownSuite.scala` and created a SQLQueryTestSuite file for left anti joins with a regression test.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #16026 from hvanhovell/SPARK-18597.

(cherry picked from commit 38e29824)
Signed-off-by: default avatarHerman van Hovell <hvanhovell@databricks.com>
parent a9d4febe
No related branches found
No related tags found
No related merge requests found
......@@ -932,7 +932,7 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
joinType match {
case _: InnerLike | LeftExistence(_) =>
case _: InnerLike | LeftSemi | ExistenceJoin(_) =>
// push down the single side only join filter for both sides sub queries
val newLeft = leftJoinConditions.
reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
......@@ -949,14 +949,14 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper {
val newJoinCond = (rightJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
Join(newLeft, newRight, RightOuter, newJoinCond)
case LeftOuter =>
case LeftOuter | LeftAnti =>
// push down the right side only join filter for right sub query
val newLeft = left
val newRight = rightJoinConditions.
reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
val newJoinCond = (leftJoinConditions ++ commonJoinCondition).reduceLeftOption(And)
Join(newLeft, newRight, LeftOuter, newJoinCond)
Join(newLeft, newRight, joinType, newJoinCond)
case FullOuter => j
case NaturalJoin(_) => sys.error("Untransformed NaturalJoin node")
case UsingJoin(_, _) => sys.error("Untransformed Using join node")
......
......@@ -514,6 +514,39 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}
test("joins: push down where clause into left anti join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery =
x.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
.where("x.a".attr > 10)
.analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer =
x.where("x.a".attr > 10)
.join(y, LeftAnti, Some("x.b".attr === "y.b".attr))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}
test("joins: only push down join conditions to the right of a left anti join") {
val x = testRelation.subquery('x)
val y = testRelation.subquery('y)
val originalQuery =
x.join(y,
LeftAnti,
Some("x.b".attr === "y.b".attr && "y.a".attr > 10 && "x.a".attr > 10)).analyze
val optimized = Optimize.execute(originalQuery)
val correctAnswer =
x.join(
y.where("y.a".attr > 10),
LeftAnti,
Some("x.b".attr === "y.b".attr && "x.a".attr > 10))
.analyze
comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
}
val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType))
test("generate: predicate referenced no generated column") {
......
-- SPARK-18597: Do not push down predicates to left hand side in an anti-join
CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2);
CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1);
SELECT *
FROM tbl_a
LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2);
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 3
-- !query 0
CREATE OR REPLACE TEMPORARY VIEW tbl_a AS VALUES (1, 1), (2, 1), (3, 6) AS T(c1, c2)
-- !query 0 schema
struct<>
-- !query 0 output
-- !query 1
CREATE OR REPLACE TEMPORARY VIEW tbl_b AS VALUES 1 AS T(c1)
-- !query 1 schema
struct<>
-- !query 1 output
-- !query 2
SELECT *
FROM tbl_a
LEFT ANTI JOIN tbl_b ON ((tbl_a.c1 = tbl_a.c2) IS NULL OR tbl_a.c1 = tbl_a.c2)
-- !query 2 schema
struct<c1:int,c2:int>
-- !query 2 output
2 1
3 6
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment