From 980bba0dcfcecbfce701d765fb45bf174cea54ad Mon Sep 17 00:00:00 2001
From: xin Wu <xinwu@us.ibm.com>
Date: Mon, 9 May 2016 17:18:48 -0700
Subject: [PATCH] [SPARK-15025][SQL] fix duplicate of PATH key in datasource
 table options

## What changes were proposed in this pull request?
The issue is that when the user provides the path option with uppercase "PATH" key, `options` contains `PATH` key and will get into the non-external case in the following code in `createDataSourceTables.scala`, where a new key "path" is created with a default path.
```
val optionsWithPath =
      if (!options.contains("path")) {
        isExternal = false
        options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
      } else {
        options
      }
```
So before creating hive table, serdeInfo.parameters will contain both "PATH" and "path" keys and different directories. and Hive table's dataLocation contains the value of "path".

The fix in this PR is to convert `options` in the code above to `CaseInsensitiveMap` before checking for containing "path" key.

## How was this patch tested?
A testcase is added

Author: xin Wu <xinwu@us.ibm.com>

Closes #12804 from xwu0226/SPARK-15025.
---
 .../command/createDataSourceTables.scala      |  6 ++--
 .../sql/hive/MetastoreDataSourcesSuite.scala  | 29 +++++++++++++++++--
 2 files changed, 29 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 16d6115737..1494341d58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.datasources.{BucketSpec, DataSource, HadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.sources.InsertableRelation
 import org.apache.spark.sql.types._
@@ -84,7 +84,7 @@ case class CreateDataSourceTableCommand(
 
     var isExternal = true
     val optionsWithPath =
-      if (!options.contains("path") && managedIfNoPath) {
+      if (!new CaseInsensitiveMap(options).contains("path") && managedIfNoPath) {
         isExternal = false
         options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
       } else {
@@ -157,7 +157,7 @@ case class CreateDataSourceTableAsSelectCommand(
     var createMetastoreTable = false
     var isExternal = true
     val optionsWithPath =
-      if (!options.contains("path")) {
+      if (!new CaseInsensitiveMap(options).contains("path")) {
         isExternal = false
         options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
       } else {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 4bdcb96feb..78c8f0043d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -944,7 +944,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
   }
 
   test("CTAS: persisted partitioned data source table") {
-    withTempDir { dir =>
+    withTempPath { dir =>
       withTable("t") {
         val path = dir.getCanonicalPath
 
@@ -968,7 +968,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
   }
 
   test("CTAS: persisted bucketed data source table") {
-    withTempDir { dir =>
+    withTempPath { dir =>
       withTable("t") {
         val path = dir.getCanonicalPath
 
@@ -988,7 +988,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
         checkAnswer(table("t"), Row(1, 2))
       }
+    }
 
+    withTempPath { dir =>
       withTable("t") {
         val path = dir.getCanonicalPath
 
@@ -1012,7 +1014,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
   }
 
   test("CTAS: persisted partitioned bucketed data source table") {
-    withTempDir { dir =>
+    withTempPath { dir =>
       withTable("t") {
         val path = dir.getCanonicalPath
 
@@ -1035,4 +1037,25 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       }
     }
   }
+
+  test("SPARK-15025: create datasource table with path with select") {
+    withTempPath { dir =>
+      withTable("t") {
+        val path = dir.getCanonicalPath
+
+        sql(
+          s"""CREATE TABLE t USING PARQUET
+             |OPTIONS (PATH '$path')
+             |AS SELECT 1 AS a, 2 AS b, 3 AS c
+           """.stripMargin
+        )
+        sql("insert into t values (2, 3, 4)")
+        checkAnswer(table("t"), Seq(Row(1, 2, 3), Row(2, 3, 4)))
+        val catalogTable = sharedState.externalCatalog.getTable("default", "t")
+        // there should not be a lowercase key 'path' now
+        assert(catalogTable.storage.serdeProperties.get("path").isEmpty)
+        assert(catalogTable.storage.serdeProperties.get("PATH").isDefined)
+      }
+    }
+  }
 }
-- 
GitLab