From bca79c823024c41731ec89f96a3722d7b1c99639 Mon Sep 17 00:00:00 2001
From: gatorsmile <gatorsmile@gmail.com>
Date: Tue, 30 Aug 2016 17:27:00 +0800
Subject: [PATCH] [SPARK-17234][SQL] Table Existence Checking when Index Table
 with the Same Name Exists

### What changes were proposed in this pull request?
Hive Index tables are not supported by Spark SQL. Thus, we issue an exception when users try to access Hive Index tables. When the internal function `tableExists` tries to access Hive Index tables, it always gets the same error message: ```Hive index table is not supported```. This message could be confusing to users, since their SQL operations could be completely unrelated to Hive Index tables. For example, when users try to alter a table to a new name and there exists an index table with the same name, the expected exception should be a `TableAlreadyExistsException`.

This PR made the following changes:
- Introduced a new `AnalysisException` type: `SQLFeatureNotSupportedException`. When users try to access an `Index Table`, we will issue a `SQLFeatureNotSupportedException`.
- `tableExists` returns `true` when hitting a `SQLFeatureNotSupportedException` and the feature is `Hive index table`.
- Add a checking `requireTableNotExists` for `SessionCatalog`'s `createTable` API; otherwise, the current implementation relies on the Hive's internal checking.

### How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14801 from gatorsmile/tableExists.
---
 .../catalog/ExternalCatalogSuite.scala        | 10 ++++++
 .../spark/sql/hive/HiveExternalCatalog.scala  |  7 +++-
 .../spark/sql/hive/client/HiveClient.scala    |  3 ++
 .../sql/hive/client/HiveClientImpl.scala      |  4 +++
 .../spark/sql/hive/client/VersionsSuite.scala |  6 ++++
 .../sql/hive/execution/HiveDDLSuite.scala     | 33 +++++++++++++++++++
 6 files changed, 62 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 54365fd978..19f8665383 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
@@ -162,6 +163,15 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
     assert(actual.tableType === CatalogTableType.EXTERNAL)
   }
 
+  test("create table when the table already exists") {
+    val catalog = newBasicCatalog()
+    assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
+    val table = newTable("tbl1", "db2")
+    intercept[TableAlreadyExistsException] {
+      catalog.createTable(table, ignoreIfExists = false)
+    }
+  }
+
   test("drop table") {
     val catalog = newBasicCatalog()
     assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 7f50e38d30..ed87ac3c3e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -30,6 +30,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
@@ -171,9 +172,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       ignoreIfExists: Boolean): Unit = withClient {
     assert(tableDefinition.identifier.database.isDefined)
     val db = tableDefinition.identifier.database.get
+    val table = tableDefinition.identifier.table
     requireDbExists(db)
     verifyTableProperties(tableDefinition)
 
+    if (tableExists(db, table) && !ignoreIfExists) {
+      throw new TableAlreadyExistsException(db = db, table = table)
+    }
     // Before saving data source table metadata into Hive metastore, we should:
     //  1. Put table schema, partition column names and bucket specification in table properties.
     //  2. Check if this table is hive compatible
@@ -450,7 +455,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
   }
 
   override def tableExists(db: String, table: String): Boolean = withClient {
-    client.getTableOption(db, table).isDefined
+    client.tableExists(db, table)
   }
 
   override def listTables(db: String): Seq[String] = withClient {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 6f009d714b..dc74fa257a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -68,6 +68,9 @@ private[hive] trait HiveClient {
   /** List the names of all the databases that match the specified pattern. */
   def listDatabases(pattern: String): Seq[String]
 
+  /** Return whether a table/view with the specified name exists. */
+  def tableExists(dbName: String, tableName: String): Boolean
+
   /** Returns the specified table, or throws [[NoSuchTableException]]. */
   final def getTable(dbName: String, tableName: String): CatalogTable = {
     getTableOption(dbName, tableName).getOrElse(throw new NoSuchTableException(dbName, tableName))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index b45ad30dca..dd982192a3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -347,6 +347,10 @@ private[hive] class HiveClientImpl(
     client.getDatabasesByPattern(pattern).asScala
   }
 
+  override def tableExists(dbName: String, tableName: String): Boolean = withHiveState {
+    Option(client.getTable(dbName, tableName, false /* do not throw exception */)).nonEmpty
+  }
+
   override def getTableOption(
       dbName: String,
       tableName: String): Option[CatalogTable] = withHiveState {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index a2509f2a75..10b6cd1024 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -218,6 +218,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
         holdDDLTime = false)
     }
 
+    test(s"$version: tableExists") {
+      // No exception should be thrown
+      assert(client.tableExists("default", "src"))
+      assert(!client.tableExists("default", "nonexistent"))
+    }
+
     test(s"$version: getTable") {
       // No exception should be thrown
       client.getTable("default", "src")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 9019333d76..58c43ebcae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -24,8 +24,10 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.internal.config._
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTableType}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -675,6 +677,37 @@ class HiveDDLSuite
     }
   }
 
+  test("create table with the same name as an index table") {
+    val tabName = "tab1"
+    val indexName = tabName + "_index"
+    withTable(tabName) {
+      // Spark SQL does not support creating index. Thus, we have to use Hive client.
+      val client = spark.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client
+      sql(s"CREATE TABLE $tabName(a int)")
+
+      try {
+        client.runSqlHive(
+          s"CREATE INDEX $indexName ON TABLE $tabName (a) AS 'COMPACT' WITH DEFERRED REBUILD")
+        val indexTabName =
+          spark.sessionState.catalog.listTables("default", s"*$indexName*").head.table
+        intercept[TableAlreadyExistsException] {
+          sql(s"CREATE TABLE $indexTabName(b int)")
+        }
+        intercept[TableAlreadyExistsException] {
+          sql(s"ALTER TABLE $tabName RENAME TO $indexTabName")
+        }
+
+        // When tableExists is not invoked, we still can get an AnalysisException
+        val e = intercept[AnalysisException] {
+          sql(s"DESCRIBE $indexTabName")
+        }.getMessage
+        assert(e.contains("Hive index table is not supported."))
+      } finally {
+        client.runSqlHive(s"DROP INDEX IF EXISTS $indexName ON $tabName")
+      }
+    }
+  }
+
   test("desc table for data source table - no user-defined schema") {
     Seq("parquet", "json", "orc").foreach { fileFormat =>
       withTable("t1") {
-- 
GitLab