diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 2059a91ba06122099657f6ad5fce7e5616589594..0415d74bd8141be0c58ac4f91be6a35e621de4df 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -28,6 +28,8 @@ trait Catalog {
 
   def caseSensitive: Boolean
 
+  def tableExists(db: Option[String], tableName: String): Boolean
+
   def lookupRelation(
     databaseName: Option[String],
     tableName: String,
@@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
     tables.clear()
   }
 
+  override def tableExists(db: Option[String], tableName: String): Boolean = {
+    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+    tables.get(tblName) match {
+      case Some(_) => true
+      case None => false
+    }
+  }
+
   override def lookupRelation(
       databaseName: Option[String],
       tableName: String,
@@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog {
   // TODO: This doesn't work when the database changes...
   val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
 
+  abstract override def tableExists(db: Option[String], tableName: String): Boolean = {
+    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+    overrides.get((dbName, tblName)) match {
+      case Some(_) => true
+      case None => super.tableExists(db, tableName)
+    }
+  }
+
   abstract override def lookupRelation(
     databaseName: Option[String],
     tableName: String,
@@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog {
 
   val caseSensitive: Boolean = true
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+    throw new UnsupportedOperationException
+  }
+
   def lookupRelation(
     databaseName: Option[String],
     tableName: String,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 096b4a07aa2ead3ba56542ae471b628447af7d76..0baf4c9f8c7abb9906c495983603e45df33b454e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
   val caseSensitive: Boolean = false
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+    val (databaseName, tblName) = processDatabaseAndTableName(
+      db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
+    client.getTable(databaseName, tblName, false) != null
+  }
+
   def lookupRelation(
       db: Option[String],
       tableName: String,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 2fce414734579540a0f4d8f6fdad4c0dbea98ea8..3d24d87bc3d38d1d9a78edfc3ebc29dfbc0b8ce5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -71,7 +71,17 @@ case class CreateTableAsSelect(
     // TODO ideally, we should get the output data ready first and then
     // add the relation into catalog, just in case of failure occurs while data
     // processing.
-    sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+    if (sc.catalog.tableExists(Some(database), tableName)) {
+      if (allowExisting) {
+        // table already exists, will do nothing, to keep consistent with Hive
+      } else {
+        throw
+          new org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
+      }
+    } else {
+      sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
+    }
+
     Seq.empty[Row]
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 76a0ec01a6075942ff6d40ee3f72bb5c44216635..e9b1943ff8db7c5747d9d8d7c6a43b58d7701322 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -56,7 +56,7 @@ class SQLQuerySuite extends QueryTest {
     sql(
       """CREATE TABLE IF NOT EXISTS ctas4 AS
         | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
-    // expect the string => integer for field key cause the table ctas4 already existed.
+    // do nothing cause the table ctas4 already existed.
     sql(
       """CREATE TABLE IF NOT EXISTS ctas4 AS
         | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
@@ -78,9 +78,14 @@ class SQLQuerySuite extends QueryTest {
           SELECT key, value
           FROM src
           ORDER BY key, value""").collect().toSeq)
+    intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
+      sql(
+        """CREATE TABLE ctas4 AS
+          | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
+    }
     checkAnswer(
       sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
-      sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, value").collect().toSeq)
+      sql("SELECT key, value FROM ctas4 LIMIT 1").collect().toSeq)
 
     checkExistence(sql("DESC EXTENDED ctas2"), true,
       "name:key", "type:string", "name:value", "ctas2",