Skip to content
Snippets Groups Projects
Commit 2cac3b2d authored by hyukjinkwon's avatar hyukjinkwon Committed by Cheng Lian
Browse files

[SPARK-16516][SQL] Support for pushing down filters for decimal and timestamp types in ORC

## What changes were proposed in this pull request?

It seems ORC supports all the types in  ([`PredicateLeaf.Type`](https://github.com/apache/hive/blob/e085b7e9bd059d91aaf013df0db4d71dca90ec6f/storage-api/src/java/org/apache/hadoop/hive/ql/io/sarg/PredicateLeaf.java#L50-L56)) which includes timestamp type and decimal type.

In more details, the types listed in [`SearchArgumentImpl.boxLiteral()`](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) can be used as a filter value.

FYI, inital `case` caluse for supported types was introduced in https://github.com/apache/spark/commit/65d71bd9fbfe6fe1b741c80fed72d6ae3d22b028 and this was not changed overtime. At that time, Hive version was, 0.13 which supports only some types for filter-push down (See [SearchArgumentImpl.java#L945-L965](https://github.com/apache/hive/blob/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L945-L965) at 0.13).

However, the version was upgraded into 1.2.x and now it supports more types (See [SearchArgumentImpl.java#L1068-L1093](https://github.com/apache/hive/blob/branch-1.2/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java#L1068-L1093) at 1.2.0)

## How was this patch tested?

Unit tests in `OrcFilterSuite` and `OrcQuerySuite`

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14172 from HyukjinKwon/SPARK-16516.
parent 5de1737b
No related branches found
No related tags found
No related merge requests found
......@@ -84,6 +84,7 @@ private[orc] object OrcFilters extends Logging {
// the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method.
case ByteType | ShortType | FloatType | DoubleType => true
case IntegerType | LongType | StringType | BooleanType => true
case TimestampType | _: DecimalType => true
case _ => false
}
......
......@@ -229,6 +229,59 @@ class OrcFilterSuite extends QueryTest with OrcTest {
}
}
test("filter pushdown - decimal") {
withOrcDataFrame((1 to 4).map(i => Tuple1.apply(BigDecimal.valueOf(i)))) { implicit df =>
checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate('_1 === BigDecimal.valueOf(1), PredicateLeaf.Operator.EQUALS)
checkFilterPredicate('_1 <=> BigDecimal.valueOf(1), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate('_1 < BigDecimal.valueOf(2), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate('_1 > BigDecimal.valueOf(3), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate('_1 <= BigDecimal.valueOf(1), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate('_1 >= BigDecimal.valueOf(4), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(
Literal(BigDecimal.valueOf(1)) === '_1, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(
Literal(BigDecimal.valueOf(1)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(
Literal(BigDecimal.valueOf(2)) > '_1, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(
Literal(BigDecimal.valueOf(3)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(
Literal(BigDecimal.valueOf(1)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(
Literal(BigDecimal.valueOf(4)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - timestamp") {
val timeString = "2015-08-20 14:57:00"
val timestamps = (1 to 4).map { i =>
val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
new Timestamp(milliseconds)
}
withOrcDataFrame(timestamps.map(Tuple1(_))) { implicit df =>
checkFilterPredicate('_1.isNull, PredicateLeaf.Operator.IS_NULL)
checkFilterPredicate('_1 === timestamps(0), PredicateLeaf.Operator.EQUALS)
checkFilterPredicate('_1 <=> timestamps(0), PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate('_1 < timestamps(1), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate('_1 > timestamps(2), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate('_1 <= timestamps(0), PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate('_1 >= timestamps(3), PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(timestamps(0)) === '_1, PredicateLeaf.Operator.EQUALS)
checkFilterPredicate(Literal(timestamps(0)) <=> '_1, PredicateLeaf.Operator.NULL_SAFE_EQUALS)
checkFilterPredicate(Literal(timestamps(1)) > '_1, PredicateLeaf.Operator.LESS_THAN)
checkFilterPredicate(Literal(timestamps(2)) < '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(timestamps(0)) >= '_1, PredicateLeaf.Operator.LESS_THAN_EQUALS)
checkFilterPredicate(Literal(timestamps(3)) <= '_1, PredicateLeaf.Operator.LESS_THAN)
}
}
test("filter pushdown - combinations with logical operators") {
withOrcDataFrame((1 to 4).map(i => Tuple1(Option(i)))) { implicit df =>
// Because `ExpressionTree` is not accessible at Hive 1.2.x, this should be checked
......@@ -277,19 +330,10 @@ class OrcFilterSuite extends QueryTest with OrcTest {
withOrcDataFrame((1 to 4).map(i => Tuple1(Array(i)))) { implicit df =>
checkNoFilterPredicate('_1.isNull)
}
// DecimalType
withOrcDataFrame((1 to 4).map(i => Tuple1(BigDecimal.valueOf(i)))) { implicit df =>
checkNoFilterPredicate('_1 <= BigDecimal.valueOf(4))
}
// BinaryType
withOrcDataFrame((1 to 4).map(i => Tuple1(i.b))) { implicit df =>
checkNoFilterPredicate('_1 <=> 1.b)
}
// TimestampType
val stringTimestamp = "2015-08-20 15:57:00"
withOrcDataFrame(Seq(Tuple1(Timestamp.valueOf(stringTimestamp)))) { implicit df =>
checkNoFilterPredicate('_1 <=> Timestamp.valueOf(stringTimestamp))
}
// DateType
val stringDate = "2015-01-01"
withOrcDataFrame(Seq(Tuple1(Date.valueOf(stringDate)))) { implicit df =>
......
......@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.orc
import java.nio.charset.StandardCharsets
import java.sql.Timestamp
import org.scalatest.BeforeAndAfterAll
......@@ -500,6 +501,40 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}
}
test("Support for pushing down filters for decimal types") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
withTempPath { file =>
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
val actual = stripSparkFilter(df).count()
assert(actual < 10)
}
}
}
test("Support for pushing down filters for timestamp types") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val timeString = "2015-08-20 14:57:00"
val data = (0 until 10).map { i =>
val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
Tuple1(new Timestamp(milliseconds))
}
withTempPath { file =>
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'")
val actual = stripSparkFilter(df).count()
assert(actual < 10)
}
}
}
test("column nullability and comment - write and then read") {
val schema = (new StructType)
.add("cl1", IntegerType, nullable = false, comment = "test")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment