diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index c133dda13e3fa0c8c946223f574ce49d2712d6ea..fc8d8c3667901c81657d907e0b7b9b380a3c8b48 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -272,6 +272,21 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * A rule to check whether the functions are supported only when Hive support is enabled
+ */
+object HiveOnlyCheck extends (LogicalPlan => Unit) {
+  def apply(plan: LogicalPlan): Unit = {
+    plan.foreach {
+      case CreateTable(tableDesc, _, Some(_))
+          if tableDesc.provider.get == "hive" =>
+        throw new AnalysisException("Hive support is required to use CREATE Hive TABLE AS SELECT")
+
+      case _ => // OK
+    }
+  }
+}
+
 /**
  * A rule to do various checks before inserting into or writing to a data source table.
  */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 052bce092369596ec6ea63f268a8844a4d836a4e..ab27381c0600d9dd471d74196e52a0d4055f69bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -117,7 +117,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {
         DataSourceAnalysis(conf) ::
         (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
 
-      override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
+      override val extendedCheckRules =
+        Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck)
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 53376c56f185845359fda90f226d1fa138d3c8bc..0eb3f2002d0bcf5d2264ed84e915ea252ccdbaf6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1578,6 +1578,34 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       "WITH SERDEPROPERTIES ('spark.sql.sources.me'='anything')")
   }
 
+  test("Create Hive Table As Select") {
+    import testImplicits._
+    withTable("t", "t1") {
+      var e = intercept[AnalysisException] {
+        sql("CREATE TABLE t SELECT 1 as a, 1 as b")
+      }.getMessage
+      assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT"))
+
+      spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
+      e = intercept[AnalysisException] {
+        sql("CREATE TABLE t SELECT a, b from t1")
+      }.getMessage
+      assert(e.contains("Hive support is required to use CREATE Hive TABLE AS SELECT"))
+    }
+  }
+
+  test("Create Data Source Table As Select") {
+    import testImplicits._
+    withTable("t", "t1", "t2") {
+      sql("CREATE TABLE t USING parquet SELECT 1 as a, 1 as b")
+      checkAnswer(spark.table("t"), Row(1, 1) :: Nil)
+
+      spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
+      sql("CREATE TABLE t2 USING parquet SELECT a, b from t1")
+      checkAnswer(spark.table("t2"), spark.table("t1"))
+    }
+  }
+
   test("drop current database") {
     sql("CREATE DATABASE temp")
     sql("USE temp")