From 8fb3d5c6da30922458091837eec17ccca502098a Mon Sep 17 00:00:00 2001
From: hyukjinkwon <gurwls223@gmail.com>
Date: Thu, 18 May 2017 10:52:23 -0700
Subject: [PATCH] [SPARK-20364][SQL] Disable Parquet predicate pushdown for
 fields having dots in the names

## What changes were proposed in this pull request?

This is an alternative workaround by simply avoiding the predicate pushdown for columns having dots in the names. This is an approach different with https://github.com/apache/spark/pull/17680.

The downside of this PR is, literally it does not push down filters on the column having dots in Parquet files at all (both no record level and no rowgroup level) whereas the downside of the approach in that PR, it does not use the Parquet's API properly but in a hacky way to support this case.

I assume we prefer a safe way here by using the Parquet API properly but this does close that PR as we are basically just avoiding here.

This way looks a simple workaround and probably it is fine given the problem looks arguably rather corner cases (although it might end up with reading whole row groups under the hood but either looks not the best).

Currently, if there are dots in the column name, predicate pushdown seems being failed in Parquet.

**With dots**

```scala
val path = "/tmp/abcde"
Seq(Some(1), None).toDF("col.dots").write.parquet(path)
spark.read.parquet(path).where("`col.dots` IS NOT NULL").show()
```

```
+--------+
|col.dots|
+--------+
+--------+
```

**Without dots**

```scala
val path = "/tmp/abcde"
Seq(Some(1), None).toDF("coldots").write.parquet(path)
spark.read.parquet(path).where("`coldots` IS NOT NULL").show()
```

```
+-------+
|coldots|
+-------+
|      1|
+-------+
```

**After**

```scala
val path = "/tmp/abcde"
Seq(Some(1), None).toDF("col.dots").write.parquet(path)
spark.read.parquet(path).where("`col.dots` IS NOT NULL").show()
```

```
+--------+
|col.dots|
+--------+
|       1|
+--------+
```

## How was this patch tested?

Unit tests added in `ParquetFilterSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18000 from HyukjinKwon/SPARK-20364-workaround.
---
 .../datasources/parquet/ParquetFilters.scala  | 57 +++++++++++--------
 .../parquet/ParquetFilterSuite.scala          | 15 +++++
 2 files changed, 47 insertions(+), 25 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index a6a6cef586..763841efbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -166,7 +166,14 @@ private[parquet] object ParquetFilters {
    * Converts data sources filters to Parquet filter predicates.
    */
   def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
-    val dataTypeOf = getFieldMap(schema)
+    val nameToType = getFieldMap(schema)
+
+    // Parquet does not allow dots in the column name because dots are used as a column path
+    // delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
+    // with missing columns. The incorrect results could be got from Parquet when we push down
+    // filters for the column having dots in the names. Thus, we do not push down such filters.
+    // See SPARK-20364.
+    def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".")
 
     // NOTE:
     //
@@ -184,30 +191,30 @@ private[parquet] object ParquetFilters {
     // Probably I missed something and obviously this should be changed.
 
     predicate match {
-      case sources.IsNull(name) if dataTypeOf.contains(name) =>
-        makeEq.lift(dataTypeOf(name)).map(_(name, null))
-      case sources.IsNotNull(name) if dataTypeOf.contains(name) =>
-        makeNotEq.lift(dataTypeOf(name)).map(_(name, null))
-
-      case sources.EqualTo(name, value) if dataTypeOf.contains(name) =>
-        makeEq.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.Not(sources.EqualTo(name, value)) if dataTypeOf.contains(name) =>
-        makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
-
-      case sources.EqualNullSafe(name, value) if dataTypeOf.contains(name) =>
-        makeEq.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.Not(sources.EqualNullSafe(name, value)) if dataTypeOf.contains(name) =>
-        makeNotEq.lift(dataTypeOf(name)).map(_(name, value))
-
-      case sources.LessThan(name, value) if dataTypeOf.contains(name) =>
-        makeLt.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.LessThanOrEqual(name, value) if dataTypeOf.contains(name) =>
-        makeLtEq.lift(dataTypeOf(name)).map(_(name, value))
-
-      case sources.GreaterThan(name, value) if dataTypeOf.contains(name) =>
-        makeGt.lift(dataTypeOf(name)).map(_(name, value))
-      case sources.GreaterThanOrEqual(name, value) if dataTypeOf.contains(name) =>
-        makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
+      case sources.IsNull(name) if canMakeFilterOn(name) =>
+        makeEq.lift(nameToType(name)).map(_(name, null))
+      case sources.IsNotNull(name) if canMakeFilterOn(name) =>
+        makeNotEq.lift(nameToType(name)).map(_(name, null))
+
+      case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
+        makeEq.lift(nameToType(name)).map(_(name, value))
+      case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name) =>
+        makeNotEq.lift(nameToType(name)).map(_(name, value))
+
+      case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
+        makeEq.lift(nameToType(name)).map(_(name, value))
+      case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name) =>
+        makeNotEq.lift(nameToType(name)).map(_(name, value))
+
+      case sources.LessThan(name, value) if canMakeFilterOn(name) =>
+        makeLt.lift(nameToType(name)).map(_(name, value))
+      case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name) =>
+        makeLtEq.lift(nameToType(name)).map(_(name, value))
+
+      case sources.GreaterThan(name, value) if canMakeFilterOn(name) =>
+        makeGt.lift(nameToType(name)).map(_(name, value))
+      case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name) =>
+        makeGtEq.lift(nameToType(name)).map(_(name, value))
 
       case sources.And(lhs, rhs) =>
         // At here, it is not safe to just convert one side if we do not understand the
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index dd53b56132..98427cfe30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -538,6 +538,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       // scalastyle:on nonascii
     }
   }
+
+  test("SPARK-20364: Disable Parquet predicate pushdown for fields having dots in the names") {
+    import testImplicits._
+
+    Seq(true, false).foreach { vectorized =>
+      withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString,
+          SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString) {
+        withTempPath { path =>
+          Seq(Some(1), None).toDF("col.dots").write.parquet(path.getAbsolutePath)
+          val readBack = spark.read.parquet(path.getAbsolutePath).where("`col.dots` IS NOT NULL")
+          assert(readBack.count() == 1)
+        }
+      }
+    }
+  }
 }
 
 class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
-- 
GitLab