From 19f4ac6dc766a3d33b04cd071b414c3bea88e97d Mon Sep 17 00:00:00 2001
From: Sameer Agarwal <sameer@databricks.com>
Date: Thu, 10 Mar 2016 12:16:46 -0800
Subject: [PATCH] [SPARK-13759][SQL] Add IsNotNull constraints for expressions
 with an inequality

## What changes were proposed in this pull request?

This PR adds support for inferring `IsNotNull` constraints from expressions with an `!==`. More specifically, if an operator has a condition on `a !== b`, we know that both `a` and `b` in the operator output can no longer be null.

## How was this patch tested?

1. Modified a test in `ConstraintPropagationSuite` to test for expressions with an inequality.
2. Added a test in `NullFilteringSuite` for making sure an Inner join with a "non-equal" condition appropriately filters out null from their input.

cc nongli

Author: Sameer Agarwal <sameer@databricks.com>

Closes #11594 from sameeragarwal/isnotequal-constraints.
---
 .../spark/sql/catalyst/plans/QueryPlan.scala  |  2 ++
 .../optimizer/NullFilteringSuite.scala        | 21 +++++++++++++++++--
 .../plans/ConstraintPropagationSuite.scala    |  4 ++--
 .../spark/sql/hive/orc/OrcFilterSuite.scala   |  5 +++--
 4 files changed, 26 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 371d72ef5a..40c06ed6d4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -56,6 +56,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
         Set(IsNotNull(l), IsNotNull(r))
       case LessThanOrEqual(l, r) =>
         Set(IsNotNull(l), IsNotNull(r))
+      case Not(EqualTo(l, r)) =>
+        Set(IsNotNull(l), IsNotNull(r))
       case _ =>
         Set.empty[Expression]
     }.foldLeft(Set.empty[Expression])(_ union _.toSet)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
index 7e52d5ef67..142e4ae6e4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/NullFilteringSuite.scala
@@ -44,11 +44,28 @@ class NullFilteringSuite extends PlanTest {
     val x = testRelation.subquery('x)
     val y = testRelation.subquery('y)
     val originalQuery = x.join(y,
-      condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
+      condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+      .analyze
     val left = x.where(IsNotNull('a) && IsNotNull('b))
     val right = y.where(IsNotNull('a) && IsNotNull('c))
     val correctAnswer = left.join(right,
-      condition = Some("x.a".attr === "y.a".attr && "x.b".attr === 1 && "y.c".attr > 5)).analyze
+      condition = Some(("x.a".attr === "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+      .analyze
+    val optimized = Optimize.execute(originalQuery)
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("single inner join: filter out nulls on either side on non equal keys") {
+    val x = testRelation.subquery('x)
+    val y = testRelation.subquery('y)
+    val originalQuery = x.join(y,
+      condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+      .analyze
+    val left = x.where(IsNotNull('a) && IsNotNull('b))
+    val right = y.where(IsNotNull('a) && IsNotNull('c))
+    val correctAnswer = left.join(right,
+      condition = Some(("x.a".attr =!= "y.a".attr) && ("x.b".attr === 1) && ("y.c".attr > 5)))
+      .analyze
     val optimized = Optimize.execute(originalQuery)
     comparePlans(optimized, correctAnswer)
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
index 868ad934da..e70d3794ab 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala
@@ -64,10 +64,10 @@ class ConstraintPropagationSuite extends SparkFunSuite {
     verifyConstraints(tr
       .where('a.attr > 10)
       .select('c.attr, 'a.attr)
-      .where('c.attr < 100)
+      .where('c.attr =!= 100)
       .analyze.constraints,
       ExpressionSet(Seq(resolveColumn(tr, "a") > 10,
-        resolveColumn(tr, "c") < 100,
+        resolveColumn(tr, "c") =!= 100,
         IsNotNull(resolveColumn(tr, "a")),
         IsNotNull(resolveColumn(tr, "c")))))
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index 46c390c436..d76d0c44f5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -212,8 +212,9 @@ class OrcFilterSuite extends QueryTest with OrcTest {
       )
       checkFilterPredicate(
         '_1 =!= 1,
-        """leaf-0 = (EQUALS _1 1)
-          |expr = (not leaf-0)""".stripMargin.trim
+        """leaf-0 = (IS_NULL _1)
+          |leaf-1 = (EQUALS _1 1)
+          |expr = (and (not leaf-0) (not leaf-1))""".stripMargin.trim
       )
       checkFilterPredicate(
         !('_1 < 4),
-- 
GitLab