From 84b5b16ea6c9816c70f7471a50eb5e4acb7fb1a1 Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin <vanzin@cloudera.com>
Date: Mon, 21 Aug 2017 15:09:02 -0700
Subject: [PATCH] [SPARK-21617][SQL] Store correct table metadata when altering
 schema in Hive metastore.

For Hive tables, the current "replace the schema" code is the correct
path, except that an exception in that path should result in an error, and
not in retrying in a different way.

For data source tables, Spark may generate a non-compatible Hive table;
but for that to work with Hive 2.1, the detection of data source tables needs
to be fixed in the Hive client, to also consider the raw tables used by code
such as `alterTableSchema`.

Tested with existing and added unit tests (plus internal tests with a 2.1 metastore).

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18849 from vanzin/SPARK-21617.
---
 .../sql/execution/command/DDLSuite.scala      |  15 +--
 .../spark/sql/hive/HiveExternalCatalog.scala  |  55 +++++---
 .../sql/hive/client/HiveClientImpl.scala      |   3 +-
 .../hive/execution/Hive_2_1_DDLSuite.scala    | 126 ++++++++++++++++++
 4 files changed, 171 insertions(+), 28 deletions(-)
 create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 9332f77343..ad6fc20df1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2357,18 +2357,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
             }.getMessage
             assert(e.contains("Found duplicate column(s)"))
           } else {
-            if (isUsingHiveMetastore) {
-              // hive catalog will still complains that c1 is duplicate column name because hive
-              // identifiers are case insensitive.
-              val e = intercept[AnalysisException] {
-                sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
-              }.getMessage
-              assert(e.contains("HiveException"))
-            } else {
-              sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
-              assert(spark.table("t1").schema
-                .equals(new StructType().add("c1", IntegerType).add("C1", StringType)))
-            }
+            sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+            assert(spark.table("t1").schema ==
+              new StructType().add("c1", IntegerType).add("C1", StringType))
           }
         }
       }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 547447b31f..bdbb8bccbc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -114,7 +114,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    * should interpret these special data source properties and restore the original table metadata
    * before returning it.
    */
-  private def getRawTable(db: String, table: String): CatalogTable = withClient {
+  private[hive] def getRawTable(db: String, table: String): CatalogTable = withClient {
     client.getTable(db, table)
   }
 
@@ -386,6 +386,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
    * can be used as table properties later.
    */
   private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, String] = {
+    tableMetaToTableProps(table, table.schema)
+  }
+
+  private def tableMetaToTableProps(
+      table: CatalogTable,
+      schema: StructType): mutable.Map[String, String] = {
     val partitionColumns = table.partitionColumnNames
     val bucketSpec = table.bucketSpec
 
@@ -397,7 +403,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     // property. In this case, we split the JSON string and store each part as a separate table
     // property.
     val threshold = conf.get(SCHEMA_STRING_LENGTH_THRESHOLD)
-    val schemaJsonString = table.schema.json
+    val schemaJsonString = schema.json
     // Split the JSON string.
     val parts = schemaJsonString.grouped(threshold).toSeq
     properties.put(DATASOURCE_SCHEMA_NUMPARTS, parts.size.toString)
@@ -615,20 +621,29 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient {
     requireTableExists(db, table)
     val rawTable = getRawTable(db, table)
-    val withNewSchema = rawTable.copy(schema = schema)
-    verifyColumnNames(withNewSchema)
     // Add table metadata such as table schema, partition columns, etc. to table properties.
-    val updatedTable = withNewSchema.copy(
-      properties = withNewSchema.properties ++ tableMetaToTableProps(withNewSchema))
-    try {
-      client.alterTable(updatedTable)
-    } catch {
-      case NonFatal(e) =>
-        val warningMessage =
-          s"Could not alter schema of table  ${rawTable.identifier.quotedString} in a Hive " +
-            "compatible way. Updating Hive metastore in Spark SQL specific format."
-        logWarning(warningMessage, e)
-        client.alterTable(updatedTable.copy(schema = updatedTable.partitionSchema))
+    val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema)
+    val withNewSchema = rawTable.copy(properties = updatedProperties, schema = schema)
+    verifyColumnNames(withNewSchema)
+
+    if (isDatasourceTable(rawTable)) {
+      // For data source tables, first try to write it with the schema set; if that does not work,
+      // try again with updated properties and the partition schema. This is a simplified version of
+      // what createDataSourceTable() does, and may leave the table in a state unreadable by Hive
+      // (for example, the schema does not match the data source schema, or does not match the
+      // storage descriptor).
+      try {
+        client.alterTable(withNewSchema)
+      } catch {
+        case NonFatal(e) =>
+          val warningMessage =
+            s"Could not alter schema of table  ${rawTable.identifier.quotedString} in a Hive " +
+              "compatible way. Updating Hive metastore in Spark SQL specific format."
+          logWarning(warningMessage, e)
+          client.alterTable(withNewSchema.copy(schema = rawTable.partitionSchema))
+      }
+    } else {
+      client.alterTable(withNewSchema)
     }
   }
 
@@ -1351,4 +1366,14 @@ object HiveExternalCatalog {
         getColumnNamesByType(metadata.properties, "sort", "sorting columns"))
     }
   }
+
+  /**
+   * Detects a data source table. This checks both the table provider and the table properties,
+   * unlike DDLUtils which just checks the former.
+   */
+  private[spark] def isDatasourceTable(table: CatalogTable): Boolean = {
+    val provider = table.provider.orElse(table.properties.get(DATASOURCE_PROVIDER))
+    provider.isDefined && provider != Some(DDLUtils.HIVE_PROVIDER)
+  }
+
 }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 995280e0e9..7c0b9bf19b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -50,6 +50,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.execution.QueryExecutionException
 import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.client.HiveClientImpl._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.{CircularBuffer, Utils}
@@ -883,7 +884,7 @@ private[hive] object HiveClientImpl {
     }
     // after SPARK-19279, it is not allowed to create a hive table with an empty schema,
     // so here we should not add a default col schema
-    if (schema.isEmpty && DDLUtils.isDatasourceTable(table)) {
+    if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) {
       // This is a hack to preserve existing behavior. Before Spark 2.0, we do not
       // set a default serde here (this was done in Hive), and so if the user provides
       // an empty schema Hive would automatically populate the schema with a single
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
new file mode 100644
index 0000000000..5c248b9acd
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.language.existentials
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.sql.types._
+import org.apache.spark.tags.ExtendedHiveTest
+import org.apache.spark.util.Utils
+
+/**
+ * A separate set of DDL tests that uses Hive 2.1 libraries, which behave a little differently
+ * from the built-in ones.
+ */
+@ExtendedHiveTest
+class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with BeforeAndAfterEach
+  with BeforeAndAfterAll {
+
+  // Create a custom HiveExternalCatalog instance with the desired configuration. We cannot
+  // use SparkSession here since there's already an active on managed by the TestHive object.
+  private var catalog = {
+    val warehouse = Utils.createTempDir()
+    val metastore = Utils.createTempDir()
+    metastore.delete()
+    val sparkConf = new SparkConf()
+      .set(SparkLauncher.SPARK_MASTER, "local")
+      .set(WAREHOUSE_PATH.key, warehouse.toURI().toString())
+      .set(CATALOG_IMPLEMENTATION.key, "hive")
+      .set(HiveUtils.HIVE_METASTORE_VERSION.key, "2.1")
+      .set(HiveUtils.HIVE_METASTORE_JARS.key, "maven")
+
+    val hadoopConf = new Configuration()
+    hadoopConf.set("hive.metastore.warehouse.dir", warehouse.toURI().toString())
+    hadoopConf.set("javax.jdo.option.ConnectionURL",
+      s"jdbc:derby:;databaseName=${metastore.getAbsolutePath()};create=true")
+    // These options are needed since the defaults in Hive 2.1 cause exceptions with an
+    // empty metastore db.
+    hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
+    hadoopConf.set("hive.metastore.schema.verification", "false")
+
+    new HiveExternalCatalog(sparkConf, hadoopConf)
+  }
+
+  override def afterEach: Unit = {
+    catalog.listTables("default").foreach { t =>
+      catalog.dropTable("default", t, true, false)
+    }
+    spark.sessionState.catalog.reset()
+  }
+
+  override def afterAll(): Unit = {
+    catalog = null
+  }
+
+  test("SPARK-21617: ALTER TABLE for non-compatible DataSource tables") {
+    testAlterTable(
+      "t1",
+      "CREATE TABLE t1 (c1 int) USING json",
+      StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))),
+      hiveCompatible = false)
+  }
+
+  test("SPARK-21617: ALTER TABLE for Hive-compatible DataSource tables") {
+    testAlterTable(
+      "t1",
+      "CREATE TABLE t1 (c1 int) USING parquet",
+      StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
+  }
+
+  test("SPARK-21617: ALTER TABLE for Hive tables") {
+    testAlterTable(
+      "t1",
+      "CREATE TABLE t1 (c1 int) STORED AS parquet",
+      StructType(Array(StructField("c1", IntegerType), StructField("c2", IntegerType))))
+  }
+
+  test("SPARK-21617: ALTER TABLE with incompatible schema on Hive-compatible table") {
+    val exception = intercept[AnalysisException] {
+      testAlterTable(
+        "t1",
+        "CREATE TABLE t1 (c1 string) USING parquet",
+        StructType(Array(StructField("c2", IntegerType))))
+    }
+    assert(exception.getMessage().contains("types incompatible with the existing columns"))
+  }
+
+  private def testAlterTable(
+      tableName: String,
+      createTableStmt: String,
+      updatedSchema: StructType,
+      hiveCompatible: Boolean = true): Unit = {
+    spark.sql(createTableStmt)
+    val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName)
+    catalog.createTable(oldTable, true)
+    catalog.alterTableSchema("default", tableName, updatedSchema)
+
+    val updatedTable = catalog.getTable("default", tableName)
+    assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames)
+  }
+
+}
-- 
GitLab