From cdcccd7b41c43d79edff2fec7a84cd00e9524f75 Mon Sep 17 00:00:00 2001
From: KaiXinXiaoLei <584620569@qq.com>
Date: Fri, 2 Mar 2018 00:09:44 +0800
Subject: [PATCH] [SPARK-23405] Generate additional constraints for Join's
 children

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)
I run a sql: `select ls.cs_order_number from ls left semi join catalog_sales cs on ls.cs_order_number = cs.cs_order_number`, The `ls` table is a small table ,and the number is one. The `catalog_sales` table is a big table,  and the number is 10 billion. The task will be hang up. And i find the many null values of `cs_order_number` in the `catalog_sales` table. I think the null value should be removed in the logical plan.

>== Optimized Logical Plan ==
>Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
>:- Project cs_order_number#1
>   : +- Filter isnotnull(cs_order_number#1)
>      : +- MetastoreRelation 100t, ls
>+- Project cs_order_number#22
>   +- MetastoreRelation 100t, catalog_sales

Now, use this patch, the plan will be:
>== Optimized Logical Plan ==
>Join LeftSemi, (cs_order_number#1 = cs_order_number#22)
>:- Project cs_order_number#1
>   : +- Filter isnotnull(cs_order_number#1)
>      : +- MetastoreRelation 100t, ls
>+- Project cs_order_number#22
>   : **+- Filter isnotnull(cs_order_number#22)**
>     :+- MetastoreRelation 100t, catalog_sales

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: KaiXinXiaoLei <584620569@qq.com>
Author: hanghang <584620569@qq.com>

Closes #20670 from KaiXinXiaoLei/Spark-23405.
---
 .../sql/catalyst/optimizer/Optimizer.scala    |  2 +-
 .../plans/logical/QueryPlanConstraints.scala  | 27 ++++++++++---------
 .../InferFiltersFromConstraintsSuite.scala    | 12 +++++++++
 3 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index a28b6a0feb..91208479be 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -661,7 +661,7 @@ object InferFiltersFromConstraints extends Rule[LogicalPlan] with PredicateHelpe
     case join @ Join(left, right, joinType, conditionOpt) =>
       // Only consider constraints that can be pushed down completely to either the left or the
       // right child
-      val constraints = join.constraints.filter { c =>
+      val constraints = join.allConstraints.filter { c =>
         c.references.subsetOf(left.outputSet) || c.references.subsetOf(right.outputSet)
       }
       // Remove those constraints that are already enforced by either the left or the right child
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
index 5c7b8e5b97..0468488755 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
@@ -23,25 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._
 trait QueryPlanConstraints { self: LogicalPlan =>
 
   /**
-   * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
-   * example, if this set contains the expression `a = 2` then that expression is guaranteed to
-   * evaluate to `true` for all rows produced.
+   * An [[ExpressionSet]] that contains an additional set of constraints, such as equality
+   * constraints and `isNotNull` constraints, etc.
    */
-  lazy val constraints: ExpressionSet = {
+  lazy val allConstraints: ExpressionSet = {
     if (conf.constraintPropagationEnabled) {
-      ExpressionSet(
-        validConstraints
-          .union(inferAdditionalConstraints(validConstraints))
-          .union(constructIsNotNullConstraints(validConstraints))
-          .filter { c =>
-            c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
-          }
-      )
+      ExpressionSet(validConstraints
+        .union(inferAdditionalConstraints(validConstraints))
+        .union(constructIsNotNullConstraints(validConstraints)))
     } else {
       ExpressionSet(Set.empty)
     }
   }
 
+  /**
+   * An [[ExpressionSet]] that contains invariants about the rows output by this operator. For
+   * example, if this set contains the expression `a = 2` then that expression is guaranteed to
+   * evaluate to `true` for all rows produced.
+   */
+  lazy val constraints: ExpressionSet = ExpressionSet(allConstraints.filter { c =>
+        c.references.nonEmpty && c.references.subsetOf(outputSet) && c.deterministic
+      })
+
   /**
    * This method can be overridden by any child class of QueryPlan to specify a set of constraints
    * based on the given operator's constraint propagation logic. These constraints are then
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index 178c4b8c27..f78c2356e3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -192,4 +192,16 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
 
     comparePlans(Optimize.execute(original.analyze), correct.analyze)
   }
+
+  test("SPARK-23405: left-semi equal-join should filter out null join keys on both sides") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val condition = Some("x.a".attr === "y.a".attr)
+    val originalQuery = x.join(y, LeftSemi, condition).analyze
+    val left = x.where(IsNotNull('a))
+    val right = y.where(IsNotNull('a))
+    val correctAnswer = left.join(right, LeftSemi, condition).analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
 }
-- 
GitLab