From 89af6dfc3afb2b8fc60fa74afb52541dbf3c4e8f Mon Sep 17 00:00:00 2001
From: wangfei <wangfei1@huawei.com>
Date: Mon, 27 Oct 2014 20:46:20 -0700
Subject: [PATCH] [SPARK-4041][SQL] Attributes names in table scan should
 converted to lowercase when compare with relation attributes

In ```MetastoreRelation``` the attributes name is lowercase because of hive using lowercase for fields name, so we should convert attributes name in table scan lowercase in ```indexWhere(_.name == a.name)```.
```neededColumnIDs``` may be not correct if not convert to lowercase.

Author: wangfei <wangfei1@huawei.com>
Author: scwf <wangfei1@huawei.com>

Closes #2884 from scwf/fixColumnIds and squashes the following commits:

6174046 [scwf] use AttributeMap for this issue
dc74a24 [wangfei] use lowerName and add a test case for this issue
3ff3a80 [wangfei] more safer change
294fcb7 [scwf] attributes names in table scan should convert lowercase in neededColumnsIDs
---
 .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 6 ++++++
 .../apache/spark/sql/hive/execution/HiveTableScan.scala  | 9 +++++----
 .../spark/sql/hive/execution/HiveTableScanSuite.scala    | 9 +++++++++
 3 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 04c48c3859..39d87a9d14 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -307,4 +307,10 @@ private[hive] case class MetastoreRelation
   val attributes = hiveQlTable.getCols.map(_.toAttribute) 
 
   val output = attributes ++ partitionKeys
+
+  /** An attribute map that can be used to lookup original attributes based on expression id. */
+  val attributeMap = AttributeMap(output.map(o => (o,o)))
+
+  /** An attribute map for determining the ordinal for non-partition columns. */
+  val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index b7f3ade4ea..d39413a44a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -44,7 +44,7 @@ import org.apache.spark.sql.hive._
  */
 @DeveloperApi
 case class HiveTableScan(
-    attributes: Seq[Attribute],
+    requestedAttributes: Seq[Attribute],
     relation: MetastoreRelation,
     partitionPruningPred: Option[Expression])(
     @transient val context: HiveContext)
@@ -53,6 +53,9 @@ case class HiveTableScan(
   require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
     "Partition pruning predicates only supported for partitioned tables.")
 
+  // Retrieve the original attributes based on expression ID so that capitalization matches.
+  val attributes = requestedAttributes.map(relation.attributeMap)
+
   // Bind all partition key attribute references in the partition pruning predicate for later
   // evaluation.
   private[this] val boundPruningPred = partitionPruningPred.map { pred =>
@@ -81,9 +84,7 @@ case class HiveTableScan(
 
   private def addColumnMetadataToConf(hiveConf: HiveConf) {
     // Specifies needed column IDs for those non-partitioning columns.
-    val neededColumnIDs =
-      attributes.map(a =>
-        relation.attributes.indexWhere(_.name == a.name): Integer).filter(index => index >= 0)
+    val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)
 
     HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name))
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index c5736723b4..2f3db95882 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.spark.sql.hive.test.TestHive
+
 class HiveTableScanSuite extends HiveComparisonTest {
 
   createQueryTest("partition_based_table_scan_with_different_serde",
@@ -38,4 +40,11 @@ class HiveTableScanSuite extends HiveComparisonTest {
       |
       |SELECT * from part_scan_test;
     """.stripMargin)
+
+  test("Spark-4041: lowercase issue") {
+    TestHive.sql("CREATE TABLE tb (KEY INT, VALUE STRING) STORED AS ORC")
+    TestHive.sql("insert into table tb select key, value from src")
+    TestHive.sql("select KEY from tb where VALUE='just_for_test' limit 5").collect()
+    TestHive.sql("drop table tb")
+  }
 }
-- 
GitLab