From e83f13e8d37ca33f4e183e977d077221b90c6025 Mon Sep 17 00:00:00 2001
From: Cheng Hao <hao.cheng@intel.com>
Date: Mon, 3 Nov 2014 13:59:43 -0800
Subject: [PATCH] [SPARK-4152] [SQL] Avoid data change in CTAS while table
 already existed
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

CREATE TABLE t1 (a String);
CREATE TABLE t1 AS SELECT key FROM src; – throw exception
CREATE TABLE if not exists t1 AS SELECT key FROM src; – expect do nothing, currently it will overwrite the t1, which is incorrect.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #3013 from chenghao-intel/ctas_unittest and squashes the following commits:

194113e [Cheng Hao] fix bug in CTAS when table already existed
---
 .../spark/sql/catalyst/analysis/Catalog.scala | 22 +++++++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  6 +++++
 .../hive/execution/CreateTableAsSelect.scala  | 12 +++++++++-
 .../sql/hive/execution/SQLQuerySuite.scala    |  9 ++++++--
 4 files changed, 46 insertions(+), 3 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 2059a91ba0..0415d74bd8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -28,6 +28,8 @@ trait Catalog {
 
   def caseSensitive: Boolean
 
+  def tableExists(db: Option[String], tableName: String): Boolean
+
   def lookupRelation(
     databaseName: Option[String],
     tableName: String,
@@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
     tables.clear()
   }
 
+  override def tableExists(db: Option[String], tableName: String): Boolean = {
+    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+    tables.get(tblName) match {
+      case Some(_) => true
+      case None => false
+    }
+  }
+
   override def lookupRelation(
       databaseName: Option[String],
       tableName: String,
@@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog {
   // TODO: This doesn't work when the database changes...
   val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
 
+  abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
+    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+    overrides.get((dbName, tblName)) match {
+      case Some(_) => true
+      case None => super.tableExists(db, tableName)
+    }
+  }
+
   abstract override def lookupRelation(
     databaseName: Option[String],
     tableName: String,
@@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog {
 
   val caseSensitive: Boolean = true
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+    throw new UnsupportedOperationException
+  }
+
   def lookupRelation(
     databaseName: Option[String],
     tableName: String,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 096b4a07aa..0baf4c9f8c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
   val caseSensitive: Boolean = false
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+    val (databaseName, tblName) = processDatabaseAndTableName(
+      db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
+    client.getTable(databaseName, tblName, false) != null
+  }
+
   def lookupRelation(
       db: Option[String],
       tableName: String,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 2fce414734..3d24d87bc3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -71,7 +71,17 @@ case class CreateTableAsSelect(
     // TODO ideally, we should get the output data ready first and then
     // add the relation into catalog, just in case of failure occurs while data
     // processing.
-    sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+    if (sc.catalog.tableExists(Some(database), tableName)) {
+      if (allowExisting) {
+        // table already exists, will do nothing, to keep consistent with Hive
+      } else {
+        throw
+          new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
+      }
+    } else {
+      sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+    }
+
     Seq.empty[Row]
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 76a0ec01a6..e9b1943ff8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -56,7 +56,7 @@ class SQLQuerySuite extends QueryTest {
     sql(
       """CREATE TABLE IF NOT EXISTS ctas4 AS
         | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
-    // expect the string => integer for field key cause the table ctas4 already existed.
+    // do nothing cause the table ctas4 already existed.
     sql(
       """CREATE TABLE IF NOT EXISTS ctas4 AS
         | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
@@ -78,9 +78,14 @@ class SQLQuerySuite extends QueryTest {
           SELECT key, value
           FROM src
           ORDER BY key, value""").collect().toSeq)
+    intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
+      sql(
+        """CREATE TABLE ctas4 AS
+          | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
+    }
     checkAnswer(
       sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
-      sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, value").collect().toSeq)
+      sql("SELECT key, value FROM ctas4 LIMIT 1").collect().toSeq)
 
     checkExistence(sql("DESC EXTENDED ctas2"), true,
       "name:key", "type:string", "name:value", "ctas2",
-- 
GitLab