From ba5f7e1842f2c5852b5309910c0d39926643da69 Mon Sep 17 00:00:00 2001
From: hyukjinkwon <gurwls223@gmail.com>
Date: Thu, 20 Aug 2015 08:13:25 +0800
Subject: [PATCH] [SPARK-10035] [SQL] Parquet filters does not process
 EqualNullSafe filter.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

As I talked with Lian,

1. I added EquelNullSafe to ParquetFilters
 - It uses the same equality comparison filter with EqualTo since the Parquet filter performs actually null-safe equality comparison.

2. Updated the test code (ParquetFilterSuite)
 - Convert catalyst.Expression to sources.Filter
 - Removed Cast since only Literal is picked up as a proper Filter in DataSourceStrategy
 - Added EquelNullSafe comparison

3. Removed deprecated createFilter for catalyst.Expression

Author: hyukjinkwon <gurwls223@gmail.com>
Author: 권혁진 <gurwls223@gmail.com>

Closes #8275 from HyukjinKwon/master.
---
 .../datasources/parquet/ParquetFilters.scala  | 113 +++---------------
 .../parquet/ParquetFilterSuite.scala          |  63 ++++------
 2 files changed, 37 insertions(+), 139 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 63915e0a28..c74c838863 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer
 
 import com.google.common.io.BaseEncoding
 import org.apache.hadoop.conf.Configuration
-import org.apache.parquet.filter2.compat.FilterCompat
-import org.apache.parquet.filter2.compat.FilterCompat._
 import org.apache.parquet.filter2.predicate.FilterApi._
 import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.io.api.Binary
@@ -39,12 +37,6 @@ import org.apache.spark.unsafe.types.UTF8String
 private[sql] object ParquetFilters {
   val PARQUET_FILTER_DATA = "org.apache.spark.sql.parquet.row.filter"
 
-  def createRecordFilter(filterExpressions: Seq[Expression]): Option[Filter] = {
-    filterExpressions.flatMap { filter =>
-      createFilter(filter)
-    }.reduceOption(FilterApi.and).map(FilterCompat.get)
-  }
-
   case class SetInFilter[T <: Comparable[T]](
     valueSet: Set[T]) extends UserDefinedPredicate[T] with Serializable {
 
@@ -205,6 +197,16 @@ private[sql] object ParquetFilters {
     // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
     // which can be casted to `false` implicitly. Please refer to the `eval` method of these
     // operators and the `SimplifyFilters` rule for details.
+
+    // Hyukjin:
+    // I added [[EqualNullSafe]] with [[org.apache.parquet.filter2.predicate.Operators.Eq]].
+    // So, it performs equality comparison identically when given [[sources.Filter]] is [[EqualTo]].
+    // The reason why I did this is, that the actual Parquet filter checks null-safe equality
+    // comparison.
+    // So I added this and maybe [[EqualTo]] should be changed. It still seems fine though, because
+    // physical planning does not set `NULL` to [[EqualTo]] but changes it to [[IsNull]] and etc.
+    // Probably I missed something and obviously this should be changed.
+
     predicate match {
       case sources.IsNull(name) =>
         makeEq.lift(dataTypeOf(name)).map(_(name, null))
@@ -216,6 +218,11 @@ private[sql] object ParquetFilters {
       case sources.Not(sources.EqualTo(name, value)) =>
         makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
 
+      case sources.EqualNullSafe(name, value) =>
+        makeEq.lift(dataTypeOf(name)).map(_(name, value))
+      case sources.Not(sources.EqualNullSafe(name, value)) =>
+        makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
+
       case sources.LessThan(name, value) =>
         makeLt.lift(dataTypeOf(name)).map(_(name, value))
       case sources.LessThanOrEqual(name, value) =>
@@ -273,96 +280,6 @@ private[sql] object ParquetFilters {
     addMethod.invoke(null, classOf[Binary], enumTypeDescriptor)
   }
 
-  /**
-   * Converts Catalyst predicate expressions to Parquet filter predicates.
-   *
-   * @todo This can be removed once we get rid of the old Parquet support.
-   */
-  def createFilter(predicate: Expression): Option[FilterPredicate] = {
-    // NOTE:
-    //
-    // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
-    // which can be casted to `false` implicitly. Please refer to the `eval` method of these
-    // operators and the `SimplifyFilters` rule for details.
-    predicate match {
-      case IsNull(NamedExpression(name, dataType)) =>
-        makeEq.lift(dataType).map(_(name, null))
-      case IsNotNull(NamedExpression(name, dataType)) =>
-        makeNotEq.lift(dataType).map(_(name, null))
-
-      case EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
-        makeEq.lift(dataType).map(_(name, value))
-      case EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
-        makeEq.lift(dataType).map(_(name, value))
-      case EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
-        makeEq.lift(dataType).map(_(name, value))
-      case EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
-        makeEq.lift(dataType).map(_(name, value))
-
-      case Not(EqualTo(NamedExpression(name, _), NonNullLiteral(value, dataType))) =>
-        makeNotEq.lift(dataType).map(_(name, value))
-      case Not(EqualTo(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _))) =>
-        makeNotEq.lift(dataType).map(_(name, value))
-      case Not(EqualTo(NonNullLiteral(value, dataType), NamedExpression(name, _))) =>
-        makeNotEq.lift(dataType).map(_(name, value))
-      case Not(EqualTo(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType))) =>
-        makeNotEq.lift(dataType).map(_(name, value))
-
-      case LessThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
-        makeLt.lift(dataType).map(_(name, value))
-      case LessThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
-        makeLt.lift(dataType).map(_(name, value))
-      case LessThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
-        makeGt.lift(dataType).map(_(name, value))
-      case LessThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
-        makeGt.lift(dataType).map(_(name, value))
-
-      case LessThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
-        makeLtEq.lift(dataType).map(_(name, value))
-      case LessThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
-        makeLtEq.lift(dataType).map(_(name, value))
-      case LessThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
-        makeGtEq.lift(dataType).map(_(name, value))
-      case LessThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
-        makeGtEq.lift(dataType).map(_(name, value))
-
-      case GreaterThan(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
-        makeGt.lift(dataType).map(_(name, value))
-      case GreaterThan(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
-        makeGt.lift(dataType).map(_(name, value))
-      case GreaterThan(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
-        makeLt.lift(dataType).map(_(name, value))
-      case GreaterThan(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
-        makeLt.lift(dataType).map(_(name, value))
-
-      case GreaterThanOrEqual(NamedExpression(name, _), NonNullLiteral(value, dataType)) =>
-        makeGtEq.lift(dataType).map(_(name, value))
-      case GreaterThanOrEqual(Cast(NamedExpression(name, _), dataType), NonNullLiteral(value, _)) =>
-        makeGtEq.lift(dataType).map(_(name, value))
-      case GreaterThanOrEqual(NonNullLiteral(value, dataType), NamedExpression(name, _)) =>
-        makeLtEq.lift(dataType).map(_(name, value))
-      case GreaterThanOrEqual(NonNullLiteral(value, _), Cast(NamedExpression(name, _), dataType)) =>
-        makeLtEq.lift(dataType).map(_(name, value))
-
-      case And(lhs, rhs) =>
-        (createFilter(lhs) ++ createFilter(rhs)).reduceOption(FilterApi.and)
-
-      case Or(lhs, rhs) =>
-        for {
-          lhsFilter <- createFilter(lhs)
-          rhsFilter <- createFilter(rhs)
-        } yield FilterApi.or(lhsFilter, rhsFilter)
-
-      case Not(pred) =>
-        createFilter(pred).map(FilterApi.not)
-
-      case InSet(NamedExpression(name, dataType), valueSet) =>
-        makeInSet.lift(dataType).map(_(name, valueSet))
-
-      case _ => None
-    }
-  }
-
   /**
    * Note: Inside the Hadoop API we only have access to `Configuration`, not to
    * [[org.apache.spark.SparkContext]], so we cannot use broadcasts to convey
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 5b4e568bb9..f067112cfc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -24,9 +24,8 @@ import org.apache.spark.sql.{Column, DataFrame, QueryTest, Row, SQLConf}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation}
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types._
 
 /**
  * A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -55,20 +54,22 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
         .select(output.map(e => Column(e)): _*)
         .where(Column(predicate))
 
-      val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+      val analyzedPredicate = query.queryExecution.optimizedPlan.collect {
         case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation)) => filters
-      }.flatten.reduceOption(_ && _)
+      }.flatten
+      assert(analyzedPredicate.nonEmpty)
 
-      assert(maybeAnalyzedPredicate.isDefined)
-      maybeAnalyzedPredicate.foreach { pred =>
-        val maybeFilter = ParquetFilters.createFilter(pred)
+      val selectedFilters = DataSourceStrategy.selectFilters(analyzedPredicate)
+      assert(selectedFilters.nonEmpty)
+
+      selectedFilters.foreach { pred =>
+        val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
         assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
         maybeFilter.foreach { f =>
           // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
           assert(f.getClass === filterClass)
         }
       }
-
       checker(query, expected)
     }
   }
@@ -109,43 +110,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], Seq(Row(true), Row(false)))
 
       checkFilterPredicate('_1 === true, classOf[Eq[_]], true)
+      checkFilterPredicate('_1 <=> true, classOf[Eq[_]], true)
       checkFilterPredicate('_1 !== true, classOf[NotEq[_]], false)
     }
   }
 
-  test("filter pushdown - short") {
-    withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i.toShort)))) { implicit df =>
-      checkFilterPredicate(Cast('_1, IntegerType) === 1, classOf[Eq[_]], 1)
-      checkFilterPredicate(
-        Cast('_1, IntegerType) !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
-
-      checkFilterPredicate(Cast('_1, IntegerType) < 2, classOf[Lt[_]], 1)
-      checkFilterPredicate(Cast('_1, IntegerType) > 3, classOf[Gt[_]], 4)
-      checkFilterPredicate(Cast('_1, IntegerType) <= 1, classOf[LtEq[_]], 1)
-      checkFilterPredicate(Cast('_1, IntegerType) >= 4, classOf[GtEq[_]], 4)
-
-      checkFilterPredicate(Literal(1) === Cast('_1, IntegerType), classOf[Eq[_]], 1)
-      checkFilterPredicate(Literal(2) > Cast('_1, IntegerType), classOf[Lt[_]], 1)
-      checkFilterPredicate(Literal(3) < Cast('_1, IntegerType), classOf[Gt[_]], 4)
-      checkFilterPredicate(Literal(1) >= Cast('_1, IntegerType), classOf[LtEq[_]], 1)
-      checkFilterPredicate(Literal(4) <= Cast('_1, IntegerType), classOf[GtEq[_]], 4)
-
-      checkFilterPredicate(!(Cast('_1, IntegerType) < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate(
-        Cast('_1, IntegerType) > 2 && Cast('_1, IntegerType) < 4, classOf[Operators.And], 3)
-      checkFilterPredicate(
-        Cast('_1, IntegerType) < 2 || Cast('_1, IntegerType) > 3,
-        classOf[Operators.Or],
-        Seq(Row(1), Row(4)))
-    }
-  }
-
   test("filter pushdown - integer") {
     withParquetDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
       checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
       checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
 
       checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
       checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
 
       checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
@@ -154,13 +130,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
 
       checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
       checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
       checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
       checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
       checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
 
       checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
       checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
     }
   }
@@ -171,6 +147,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
 
       checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
       checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
 
       checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
@@ -179,13 +156,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
 
       checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
       checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
       checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
       checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
       checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
 
       checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
       checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
     }
   }
@@ -196,6 +173,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
 
       checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
       checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
 
       checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
@@ -204,13 +182,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
 
       checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
       checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
       checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
       checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
       checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
 
       checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
       checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
     }
   }
@@ -221,6 +199,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkFilterPredicate('_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(Row.apply(_)))
 
       checkFilterPredicate('_1 === 1, classOf[Eq[_]], 1)
+      checkFilterPredicate('_1 <=> 1, classOf[Eq[_]], 1)
       checkFilterPredicate('_1 !== 1, classOf[NotEq[_]], (2 to 4).map(Row.apply(_)))
 
       checkFilterPredicate('_1 < 2, classOf[Lt[_]], 1)
@@ -229,13 +208,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkFilterPredicate('_1 >= 4, classOf[GtEq[_]], 4)
 
       checkFilterPredicate(Literal(1) === '_1, classOf[Eq[_]], 1)
+      checkFilterPredicate(Literal(1) <=> '_1, classOf[Eq[_]], 1)
       checkFilterPredicate(Literal(2) > '_1, classOf[Lt[_]], 1)
       checkFilterPredicate(Literal(3) < '_1, classOf[Gt[_]], 4)
       checkFilterPredicate(Literal(1) >= '_1, classOf[LtEq[_]], 1)
       checkFilterPredicate(Literal(4) <= '_1, classOf[GtEq[_]], 4)
 
       checkFilterPredicate(!('_1 < 4), classOf[GtEq[_]], 4)
-      checkFilterPredicate('_1 > 2 && '_1 < 4, classOf[Operators.And], 3)
       checkFilterPredicate('_1 < 2 || '_1 > 3, classOf[Operators.Or], Seq(Row(1), Row(4)))
     }
   }
@@ -247,6 +226,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
         '_1.isNotNull, classOf[NotEq[_]], (1 to 4).map(i => Row.apply(i.toString)))
 
       checkFilterPredicate('_1 === "1", classOf[Eq[_]], "1")
+      checkFilterPredicate('_1 <=> "1", classOf[Eq[_]], "1")
       checkFilterPredicate(
         '_1 !== "1", classOf[NotEq[_]], (2 to 4).map(i => Row.apply(i.toString)))
 
@@ -256,13 +236,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkFilterPredicate('_1 >= "4", classOf[GtEq[_]], "4")
 
       checkFilterPredicate(Literal("1") === '_1, classOf[Eq[_]], "1")
+      checkFilterPredicate(Literal("1") <=> '_1, classOf[Eq[_]], "1")
       checkFilterPredicate(Literal("2") > '_1, classOf[Lt[_]], "1")
       checkFilterPredicate(Literal("3") < '_1, classOf[Gt[_]], "4")
       checkFilterPredicate(Literal("1") >= '_1, classOf[LtEq[_]], "1")
       checkFilterPredicate(Literal("4") <= '_1, classOf[GtEq[_]], "4")
 
       checkFilterPredicate(!('_1 < "4"), classOf[GtEq[_]], "4")
-      checkFilterPredicate('_1 > "2" && '_1 < "4", classOf[Operators.And], "3")
       checkFilterPredicate('_1 < "2" || '_1 > "3", classOf[Operators.Or], Seq(Row("1"), Row("4")))
     }
   }
@@ -274,6 +254,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
 
     withParquetDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
       checkBinaryFilterPredicate('_1 === 1.b, classOf[Eq[_]], 1.b)
+      checkBinaryFilterPredicate('_1 <=> 1.b, classOf[Eq[_]], 1.b)
 
       checkBinaryFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
       checkBinaryFilterPredicate(
@@ -288,13 +269,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       checkBinaryFilterPredicate('_1 >= 4.b, classOf[GtEq[_]], 4.b)
 
       checkBinaryFilterPredicate(Literal(1.b) === '_1, classOf[Eq[_]], 1.b)
+      checkBinaryFilterPredicate(Literal(1.b) <=> '_1, classOf[Eq[_]], 1.b)
       checkBinaryFilterPredicate(Literal(2.b) > '_1, classOf[Lt[_]], 1.b)
       checkBinaryFilterPredicate(Literal(3.b) < '_1, classOf[Gt[_]], 4.b)
       checkBinaryFilterPredicate(Literal(1.b) >= '_1, classOf[LtEq[_]], 1.b)
       checkBinaryFilterPredicate(Literal(4.b) <= '_1, classOf[GtEq[_]], 4.b)
 
       checkBinaryFilterPredicate(!('_1 < 4.b), classOf[GtEq[_]], 4.b)
-      checkBinaryFilterPredicate('_1 > 2.b && '_1 < 4.b, classOf[Operators.And], 3.b)
       checkBinaryFilterPredicate(
         '_1 < 2.b || '_1 > 3.b, classOf[Operators.Or], Seq(Row(1.b), Row(4.b)))
     }
-- 
GitLab