From 2cac3b2d4a4a4f3d0d45af4defc23bb0ba53484b Mon Sep 17 00:00:00 2001
From: hyukjinkwon <gurwls223@gmail.com>
Date: Wed, 28 Sep 2016 00:50:12 +0800
Subject: [PATCH] [SPARK-16516][SQL] Support for pushing down filters for
 decimal and timestamp types in ORC

## What changes were proposed in this pull request?

It seems ORC supports all the types in  ([`PredicateLeaf.Type`](https://github.com/apache/hive/blob/e085b7e9bd059d91aaf013df0db4d71dca90ec6f/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java#L50-L56)) which includes timestamp type and decimal type.

In more details, the types listed in [`SearchArgumentImpl.boxLiteral()`](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) can be used as a filter value.

FYI, inital `case` caluse for supported types was introduced in https://github.com/apache/spark/commit/65d71bd9fbfe6fe1b741c80fed72d6ae3d22b028 and this was not changed overtime. At that time, Hive version was, 0.13 which supports only some types for filter-push down (See [SearchArgumentImpl.java#L945-L965](https://github.com/apache/hive/blob/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L945-L965) at 0.13).

However, the version was upgraded into 1.2.x and now it supports more types (See [SearchArgumentImpl.java#L1068-L1093](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) at 1.2.0)

## How was this patch tested?

Unit tests in `OrcFilterSuite` and `OrcQuerySuite`

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14172 from HyukjinKwon/SPARK-16516.
---
 .../spark/sql/hive/orc/OrcFilters.scala       |  1 +
 .../spark/sql/hive/orc/OrcFilterSuite.scala   | 62 ++++++++++++++++---
 .../spark/sql/hive/orc/OrcQuerySuite.scala    | 35 +++++++++++
 3 files changed, 89 insertions(+), 9 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
index 6ab8244559..d9efd0cb45 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala
@@ -84,6 +84,7 @@ private[orc] object OrcFilters extends Logging {
       // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
       case ByteType | ShortType | FloatType | DoubleType => true
       case IntegerType | LongType | StringType | BooleanType => true
+      case TimestampType | _: DecimalType => true
       case _ => false
     }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index 471192a369..222c24927a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -229,6 +229,59 @@ class OrcFilterSuite extends QueryTest with OrcTest {
     }
   }
 
+  test("filter pushdown - decimal") {
+    withOrcDataFrame((1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === BigDecimal.valueOf(1), PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> BigDecimal.valueOf(1), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < BigDecimal.valueOf(2), PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > BigDecimal.valueOf(3), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= BigDecimal.valueOf(1), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= BigDecimal.valueOf(4), PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(2)) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(3)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(1)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(
+        Literal(BigDecimal.valueOf(4)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
+  test("filter pushdown - timestamp") {
+    val timeString = "2015-08-20 14:57:00"
+    val timestamps = (1 to 4).map { i =>
+      val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
+      new Timestamp(milliseconds)
+    }
+    withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
+      checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
+
+      checkFilterPredicate('_1 === timestamps(0), PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate('_1 <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+
+      checkFilterPredicate('_1 < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate('_1 > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate('_1 >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
+
+      checkFilterPredicate(Literal(timestamps(0)) === '_1, PredicateLeaf.Operator.EQUALS)
+      checkFilterPredicate(Literal(timestamps(0)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
+      checkFilterPredicate(Literal(timestamps(1)) > '_1, PredicateLeaf.Operator.LESS_THAN)
+      checkFilterPredicate(Literal(timestamps(2)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(timestamps(0)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
+      checkFilterPredicate(Literal(timestamps(3)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
+    }
+  }
+
   test("filter pushdown - combinations with logical operators") {
     withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
       // Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked
@@ -277,19 +330,10 @@ class OrcFilterSuite extends QueryTest with OrcTest {
     withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df =>
       checkNoFilterPredicate('_1.isNull)
     }
-    // DecimalType
-    withOrcDataFrame((1 to 4).map(i => Tuple1(BigDecimal.valueOf(i)))) { implicit df =>
-      checkNoFilterPredicate('_1 <= BigDecimal.valueOf(4))
-    }
     // BinaryType
     withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
       checkNoFilterPredicate('_1 <=> 1.b)
     }
-    // TimestampType
-    val stringTimestamp = "2015-08-20 15:57:00"
-    withOrcDataFrame(Seq(Tuple1(Timestamp.valueOf(stringTimestamp)))) { implicit df =>
-      checkNoFilterPredicate('_1 <=> Timestamp.valueOf(stringTimestamp))
-    }
     // DateType
     val stringDate = "2015-01-01"
     withOrcDataFrame(Seq(Tuple1(Date.valueOf(stringDate)))) { implicit df =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index b13878d578..b2ee49c441 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive.orc
 
 import java.nio.charset.StandardCharsets
+import java.sql.Timestamp
 
 import org.scalatest.BeforeAndAfterAll
 
@@ -500,6 +501,40 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
     }
   }
 
+  test("Support for pushing down filters for decimal types") {
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
+      withTempPath { file =>
+        // It needs to repartition data so that we can have several ORC files
+        // in order to skip stripes in ORC.
+        createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
+        val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
+        val actual = stripSparkFilter(df).count()
+
+        assert(actual < 10)
+      }
+    }
+  }
+
+  test("Support for pushing down filters for timestamp types") {
+    withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      val timeString = "2015-08-20 14:57:00"
+      val data = (0 until 10).map { i =>
+        val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
+        Tuple1(new Timestamp(milliseconds))
+      }
+      withTempPath { file =>
+        // It needs to repartition data so that we can have several ORC files
+        // in order to skip stripes in ORC.
+        createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
+        val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'")
+        val actual = stripSparkFilter(df).count()
+
+        assert(actual < 10)
+      }
+    }
+  }
+
   test("column nullability and comment - write and then read") {
     val schema = (new StructType)
       .add("cl1", IntegerType, nullable = false, comment = "test")
-- 
GitLab