From 0bf605c2c67ca361cd4aa3a3b4492bef4aef76b9 Mon Sep 17 00:00:00 2001
From: Wenchen Fan <wenchen@databricks.com>
Date: Thu, 19 Jan 2017 20:09:48 -0800
Subject: [PATCH] [SPARK-19292][SQL] filter with partition columns should be
 case-insensitive on Hive tables

## What changes were proposed in this pull request?

When we query a table with a filter on partitioned columns, we will push the partition filter to the metastore to get matched partitions directly.

In `HiveExternalCatalog.listPartitionsByFilter`, we assume the column names in partition filter are already normalized and we don't need to consider case sensitivity. However, `HiveTableScanExec` doesn't follow this assumption. This PR fixes it.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16647 from cloud-fan/bug.
---
 .../execution/datasources/FileSourceStrategy.scala  |  2 +-
 .../sql/hive/execution/HiveTableScanExec.scala      | 12 +++++++++++-
 .../spark/sql/hive/execution/SQLQuerySuite.scala    | 13 +++++++++++++
 3 files changed, 25 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 6d0671d4cb..26e1380eca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -62,7 +62,7 @@ object FileSourceStrategy extends Strategy with Logging {
       val filterSet = ExpressionSet(filters)
 
       // The attribute name of predicate could be different than the one in schema in case of
-      // case insensitive, we should change them to match the one in schema, so we donot need to
+      // case insensitive, we should change them to match the one in schema, so we do not need to
       // worry about case sensitivity anymore.
       val normalizedFilters = filters.map { e =>
         e transform {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 7ee5fc543c..def6ef3691 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -146,9 +146,19 @@ case class HiveTableScanExec(
         hadoopReader.makeRDDForTable(relation.hiveQlTable)
       }
     } else {
+      // The attribute name of predicate could be different than the one in schema in case of
+      // case insensitive, we should change them to match the one in schema, so we do not need to
+      // worry about case sensitivity anymore.
+      val normalizedFilters = partitionPruningPred.map { e =>
+        e transform {
+          case a: AttributeReference =>
+            a.withName(relation.output.find(_.semanticEquals(a)).get.name)
+        }
+      }
+
       Utils.withDummyCallSite(sqlContext.sparkContext) {
         hadoopReader.makeRDDForPartitionedTable(
-          prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+          prunePartitions(relation.getHiveQlPartitions(normalizedFilters)))
       }
     }
     val numOutputRows = longMetric("numOutputRows")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 104b5250b6..1a28c4c84a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2014,4 +2014,17 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       )
     }
   }
+
+  test("SPARK-19292: filter with partition columns should be case-insensitive on Hive tables") {
+    withTable("tbl") {
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+        sql("CREATE TABLE tbl(i int, j int) USING hive PARTITIONED BY (j)")
+        sql("INSERT INTO tbl PARTITION(j=10) SELECT 1")
+        checkAnswer(spark.table("tbl"), Row(1, 10))
+
+        checkAnswer(sql("SELECT i, j FROM tbl WHERE J=10"), Row(1, 10))
+        checkAnswer(spark.table("tbl").filter($"J" === 10), Row(1, 10))
+      }
+    }
+  }
 }
-- 
GitLab