From f3ff1eb2985ff3e1567645b898f6b42e4b01f237 Mon Sep 17 00:00:00 2001
From: Yin Huai <yhuai@databricks.com>
Date: Mon, 16 Feb 2015 15:54:01 -0800
Subject: [PATCH] [SPARK-5839][SQL]HiveMetastoreCatalog does not recognize
 table names and aliases of data source tables.

JIRA: https://issues.apache.org/jira/browse/SPARK-5839

Author: Yin Huai <yhuai@databricks.com>

Closes #4626 from yhuai/SPARK-5839 and squashes the following commits:

f779d85 [Yin Huai] Use subqeury to wrap replaced ParquetRelation.
2695f13 [Yin Huai] Merge remote-tracking branch 'upstream/master' into SPARK-5839
f1ba6ca [Yin Huai] Address comment.
2c7fa08 [Yin Huai] Use Subqueries to wrap a data source table.
---
 .../spark/sql/parquet/ParquetQuerySuite.scala |  5 +--
 .../spark/sql/hive/HiveMetastoreCatalog.scala | 18 ++++++++--
 .../sql/hive/MetastoreDataSourcesSuite.scala  | 34 +++++++++++++++++++
 3 files changed, 53 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 9318c15520..8b4d05ec54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -30,9 +30,10 @@ import org.apache.spark.sql.test.TestSQLContext._
 class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
   val sqlContext = TestSQLContext
 
-  test("simple projection") {
+  test("simple select queries") {
     withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
-      checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
+      checkAnswer(sql("SELECT _1 FROM t where t._1 > 5"), (6 until 10).map(Row.apply(_)))
+      checkAnswer(sql("SELECT _1 FROM t as tmp where tmp._1 < 5"), (0 until 5).map(Row.apply(_)))
     }
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 72211fe2e4..87bc9fe4fe 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -160,7 +160,15 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
     }
 
     if (table.getProperty("spark.sql.sources.provider") != null) {
-      cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+      val dataSourceTable =
+        cachedDataSourceTables(QualifiedTableName(databaseName, tblName).toLowerCase)
+      // Then, if alias is specified, wrap the table with a Subquery using the alias.
+      // Othersie, wrap the table with a Subquery using the table name.
+      val withAlias =
+        alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
+          Subquery(tableIdent.last, dataSourceTable))
+
+      withAlias
     } else if (table.isView) {
       // if the unresolved relation is from hive view
       // parse the text into logic node.
@@ -433,7 +441,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
         val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output))
 
         lastPlan.transformUp {
-          case r: MetastoreRelation if r == relation => parquetRelation
+          case r: MetastoreRelation if r == relation => {
+            val withAlias =
+              r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
+                Subquery(r.tableName, parquetRelation))
+
+            withAlias
+          }
           case other => other.transformExpressions {
             case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
           }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 375aae5d51..0263e3bb56 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -401,6 +401,40 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     sql("DROP TABLE jsonTable").collect().foreach(println)
   }
 
+  test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") {
+    val originalDefaultSource = conf.defaultDataSourceName
+
+    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+    val df = jsonRDD(rdd)
+
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
+    // Save the df as a managed table (by not specifiying the path).
+    df.saveAsTable("savedJsonTable")
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+      (1 to 4).map(i => Row(i, s"str${i}")))
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+      (6 to 10).map(i => Row(i, s"str${i}")))
+
+    invalidateTable("savedJsonTable")
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+      (1 to 4).map(i => Row(i, s"str${i}")))
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+      (6 to 10).map(i => Row(i, s"str${i}")))
+
+    // Drop table will also delete the data.
+    sql("DROP TABLE savedJsonTable")
+
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
+  }
+
   test("save table") {
     val originalDefaultSource = conf.defaultDataSourceName
 
-- 
GitLab