diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index b8099385a466b2a2d19a17cea7d68021d344bc90..15a5d79dcb08597f4d65334c9926692145dd64f5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive.execution
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
@@ -87,8 +89,15 @@ case class CreateHiveTableAsSelectCommand(
         throw new AnalysisException(s"$tableIdentifier already exists.")
       }
     } else {
-      sparkSession.sessionState.executePlan(InsertIntoTable(
-        metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
+      try {
+        sparkSession.sessionState.executePlan(InsertIntoTable(
+          metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
+      } catch {
+        case NonFatal(e) =>
+          // drop the created table.
+          sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true)
+          throw e
+      }
     }
 
     Seq.empty[Row]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 89f69c8e4d7f1941162fe579bf13b42ca63d4706..9d3c4cd3d570cad29222d135db3f26d127a33adb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -554,6 +554,21 @@ class HiveDDLSuite
     }
   }
 
+  test("Create Cataloged Table As Select - Drop Table After Runtime Exception") {
+    withTable("tab") {
+      intercept[RuntimeException] {
+        sql(
+          """
+            |CREATE TABLE tab
+            |STORED AS TEXTFILE
+            |SELECT 1 AS a, (SELECT a FROM (SELECT 1 AS a UNION ALL SELECT 2 AS a) t) AS b
+          """.stripMargin)
+      }
+      // After hitting runtime exception, we should drop the created table.
+      assert(!spark.sessionState.catalog.tableExists(TableIdentifier("tab")))
+    }
+  }
+
   test("desc table for data source table") {
     withTable("tab1") {
       val tabName = "tab1"