Skip to content
Snippets Groups Projects
Commit e83f13e8 authored by Cheng Hao's avatar Cheng Hao Committed by Michael Armbrust
Browse files

[SPARK-4152] [SQL] Avoid data change in CTAS while table already existed

CREATE TABLE t1 (a String);
CREATE TABLE t1 AS SELECT key FROM src; – throw exception
CREATE TABLE if not exists t1 AS SELECT key FROM src; – expect do nothing, currently it will overwrite the t1, which is incorrect.

Author: Cheng Hao <hao.cheng@intel.com>

Closes #3013 from chenghao-intel/ctas_unittest and squashes the following commits:

194113e [Cheng Hao] fix bug in CTAS when table already existed
parent c238fb42
No related branches found
No related tags found
No related merge requests found
......@@ -28,6 +28,8 @@ trait Catalog {
def caseSensitive: Boolean
def tableExists(db: Option[String], tableName: String): Boolean
def lookupRelation(
databaseName: Option[String],
tableName: String,
......@@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
tables.clear()
}
override def tableExists(db: Option[String], tableName: String): Boolean = {
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
tables.get(tblName) match {
case Some(_) => true
case None => false
}
}
override def lookupRelation(
databaseName: Option[String],
tableName: String,
......@@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog {
// TODO: This doesn't work when the database changes...
val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
overrides.get((dbName, tblName)) match {
case Some(_) => true
case None => super.tableExists(db, tableName)
}
}
abstract override def lookupRelation(
databaseName: Option[String],
tableName: String,
......@@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog {
val caseSensitive: Boolean = true
def tableExists(db: Option[String], tableName: String): Boolean = {
throw new UnsupportedOperationException
}
def lookupRelation(
databaseName: Option[String],
tableName: String,
......
......@@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val caseSensitive: Boolean = false
def tableExists(db: Option[String], tableName: String): Boolean = {
val (databaseName, tblName) = processDatabaseAndTableName(
db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
client.getTable(databaseName, tblName, false) != null
}
def lookupRelation(
db: Option[String],
tableName: String,
......
......@@ -71,7 +71,17 @@ case class CreateTableAsSelect(
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
if (sc.catalog.tableExists(Some(database), tableName)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
throw
new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
}
} else {
sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
}
Seq.empty[Row]
}
......
......@@ -56,7 +56,7 @@ class SQLQuerySuite extends QueryTest {
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
| SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
// expect the string => integer for field key cause the table ctas4 already existed.
// do nothing cause the table ctas4 already existed.
sql(
"""CREATE TABLE IF NOT EXISTS ctas4 AS
| SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
......@@ -78,9 +78,14 @@ class SQLQuerySuite extends QueryTest {
SELECT key, value
FROM src
ORDER BY key, value""").collect().toSeq)
intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
sql(
"""CREATE TABLE ctas4 AS
| SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
}
checkAnswer(
sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, value").collect().toSeq)
sql("SELECT key, value FROM ctas4 LIMIT 1").collect().toSeq)
checkExistence(sql("DESC EXTENDED ctas2"), true,
"name:key", "type:string", "name:value", "ctas2",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment