diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 695ba1234d45836d97608e4120fd70bf284d5d84..d10fa2c9ff24744530db76e99c8899ba5f20460a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
-import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.command.{DDLUtils, RunnableCommand}
 import org.apache.spark.sql.types._
 
 case class CreateTable(
@@ -65,6 +65,11 @@ case class CreateTempViewUsing(
   }
 
   def run(sparkSession: SparkSession): Seq[Row] = {
+    if (provider.toLowerCase == DDLUtils.HIVE_PROVIDER) {
+      throw new AnalysisException("Hive data source can only be used with tables, " +
+        "you can't use it with CREATE TEMP VIEW USING")
+    }
+
     val dataSource = DataSource(
       sparkSession,
       userSpecifiedSchema = userSpecifiedSchema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 6888dece26de1e58a601b5e2cff157671291f4ce..d553d44cb3a1478b09f1e71da35db95b031d2526 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -49,7 +49,7 @@ class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
         // will catch it and return the original plan, so that the analyzer can report table not
         // found later.
         val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
-        if (!isFileFormat) {
+        if (!isFileFormat || dataSource.className.toLowerCase == DDLUtils.HIVE_PROVIDER) {
           throw new AnalysisException("Unsupported data source type for direct query on files: " +
             s"${u.tableIdentifier.database.get}")
         }
@@ -110,11 +110,6 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
         throw new AnalysisException("Saving data into a view is not allowed.")
       }
 
-      if (DDLUtils.isHiveTable(existingTable)) {
-        throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
-          "not supported yet. Please use the insertInto() API as an alternative.")
-      }
-
       // Check if the specified data source match the data source of the existing table.
       val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
       val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
diff --git a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 32aa13ff257a6564cbcdb55dadf1f6a57371a31c..e7d762fbebe7616a5f396a5c03e0bae1adcf9b06 100644
--- a/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/hive/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1,2 @@
 org.apache.spark.sql.hive.orc.OrcFileFormat
+org.apache.spark.sql.hive.execution.HiveFileFormat
\ No newline at end of file
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 6cde783c5ae3274e1aa5dc713d0620cfdc465eec..9a7111aa3b8b035157d30f122eaa6bf12f87c285 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -88,18 +88,11 @@ class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
       InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
 
     case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
-      // Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde
-      // tables yet.
-      if (mode == SaveMode.Append) {
-        throw new AnalysisException(
-          "CTAS for hive serde tables does not support append semantics.")
-      }
-
       val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
       CreateHiveTableAsSelectCommand(
         tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
         query,
-        mode == SaveMode.Ignore)
+        mode)
   }
 }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 0d30053937a20066621e0002f5a5bf068835a1d8..2c754d7fbf9dbc30af9e12923bf523a4fe5152c0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.command.RunnableCommand
@@ -31,13 +31,12 @@ import org.apache.spark.sql.hive.MetastoreRelation
  *
  * @param tableDesc the Table Describe, which may contains serde, storage handler etc.
  * @param query the query whose result will be insert into the new relation
- * @param ignoreIfExists allow continue working if it's already exists, otherwise
- *                      raise exception
+ * @param mode SaveMode
  */
 case class CreateHiveTableAsSelectCommand(
     tableDesc: CatalogTable,
     query: LogicalPlan,
-    ignoreIfExists: Boolean)
+    mode: SaveMode)
   extends RunnableCommand {
 
   private val tableIdentifier = tableDesc.identifier
@@ -67,7 +66,7 @@ case class CreateHiveTableAsSelectCommand(
         withFormat
       }
 
-      sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
+      sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true)
 
       // Get the Metastore Relation
       sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
@@ -80,11 +79,18 @@ case class CreateHiveTableAsSelectCommand(
     // add the relation into catalog, just in case of failure occurs while data
     // processing.
     if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
-      if (ignoreIfExists) {
-        // table already exists, will do nothing, to keep consistent with Hive
-      } else {
+      assert(mode != SaveMode.Overwrite,
+        s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
+
+      if (mode == SaveMode.ErrorIfExists) {
         throw new AnalysisException(s"$tableIdentifier already exists.")
       }
+      if (mode == SaveMode.Ignore) {
+        // Since the table already exists and the save mode is Ignore, we will just return.
+        return Seq.empty
+      }
+      sparkSession.sessionState.executePlan(InsertIntoTable(
+        metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd
     } else {
       try {
         sparkSession.sessionState.executePlan(InsertIntoTable(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index cc2b60bc419634731c43780c503c0fd709f71445..ac735e8b383f6c7668288ffbd77e9d7065595430 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
 import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
+import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableJobConf
 
@@ -43,7 +44,13 @@ import org.apache.spark.util.SerializableJobConf
  *
  * TODO: implement the read logic.
  */
-class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat with Logging {
+class HiveFileFormat(fileSinkConf: FileSinkDesc)
+  extends FileFormat with DataSourceRegister with Logging {
+
+  def this() = this(null)
+
+  override def shortName(): String = "hive"
+
   override def inferSchema(
       sparkSession: SparkSession,
       options: Map[String, String],
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index f0e2c9369bd05fbe572cfa8f37f7f7b0ed455a8d..c262095df65b4d44fba3732e84015912b830501b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -419,12 +419,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       sql(s"CREATE TABLE $tableName STORED AS SEQUENCEFILE AS SELECT 1 AS key, 'abc' AS value")
 
       val df = sql(s"SELECT key, value FROM $tableName")
-      val e = intercept[AnalysisException] {
-        df.write.mode(SaveMode.Append).saveAsTable(tableName)
-      }.getMessage
-      assert(e.contains("Saving data in the Hive serde table default.tab1 is not supported " +
-        "yet. Please use the insertInto() API as an alternative."))
-
       df.write.insertInto(tableName)
       checkAnswer(
         sql(s"SELECT * FROM $tableName"),
@@ -1167,8 +1161,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
   test("create a temp view using hive") {
     val tableName = "tab1"
-    withTable (tableName) {
-      val e = intercept[ClassNotFoundException] {
+    withTable(tableName) {
+      val e = intercept[AnalysisException] {
         sql(
           s"""
              |CREATE TEMPORARY VIEW $tableName
@@ -1176,7 +1170,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
              |USING hive
            """.stripMargin)
       }.getMessage
-      assert(e.contains("Failed to find data source: hive"))
+      assert(e.contains("Hive data source can only be used with tables, you can't use it with " +
+        "CREATE TEMP VIEW USING"))
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index a77c68339d43fb667aeaa7f44f80bbc75079dc10..282718345674768b517d183ce0c5187f79cd3c74 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1326,7 +1326,7 @@ class HiveDDLSuite
   }
 
   test("create hive serde table with DataFrameWriter.saveAsTable") {
-    withTable("t", "t2") {
+    withTable("t", "t1") {
       Seq(1 -> "a").toDF("i", "j")
         .write.format("hive").option("fileFormat", "avro").saveAsTable("t")
       checkAnswer(spark.table("t"), Row(1, "a"))
@@ -1357,11 +1357,8 @@ class HiveDDLSuite
       assert(table.storage.serde ==
         Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
 
-      sql("INSERT INTO t SELECT 2, 'b'")
-      checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
-
       val e2 = intercept[AnalysisException] {
-        Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
+        Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t1")
       }
       assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))
 
@@ -1372,6 +1369,35 @@ class HiveDDLSuite
     }
   }
 
+  test("append data to hive serde table") {
+    withTable("t", "t1") {
+      Seq(1 -> "a").toDF("i", "j")
+        .write.format("hive").option("fileFormat", "avro").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row(1, "a"))
+
+      sql("INSERT INTO t SELECT 2, 'b'")
+      checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
+
+      Seq(3 -> "c").toDF("i", "j")
+        .write.format("hive").mode("append").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil)
+
+      Seq("c" -> 3).toDF("i", "j")
+        .write.format("hive").mode("append").saveAsTable("t")
+      checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Row(3, "c")
+        :: Row(null, "3") :: Nil)
+
+      Seq(4 -> "d").toDF("i", "j").write.saveAsTable("t1")
+
+      val e = intercept[AnalysisException] {
+        Seq(5 -> "e").toDF("i", "j")
+          .write.format("hive").mode("append").saveAsTable("t1")
+      }
+      assert(e.message.contains("The format of the existing table default.t1 is " +
+        "`ParquetFileFormat`. It doesn't match the specified format `HiveFileFormat`."))
+    }
+  }
+
   test("create partitioned hive serde table as select") {
     withTable("t", "t1") {
       withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 1a28c4c84a8633fe9d9589191ccf02893b5fc262..20f30e48aba44776c76467d0817fbbd0105b18ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1461,6 +1461,23 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     })
   }
 
+  test("run sql directly on files - hive") {
+    withTempPath(f => {
+      spark.range(100).toDF.write.parquet(f.getCanonicalPath)
+
+      var e = intercept[AnalysisException] {
+        sql(s"select id from hive.`${f.getCanonicalPath}`")
+      }
+      assert(e.message.contains("Unsupported data source type for direct query on files: hive"))
+
+      // data source type is case insensitive
+      e = intercept[AnalysisException] {
+        sql(s"select id from HIVE.`${f.getCanonicalPath}`")
+      }
+      assert(e.message.contains("Unsupported data source type for direct query on files: HIVE"))
+    })
+  }
+
   test("SPARK-8976 Wrong Result for Rollup #1") {
     checkAnswer(sql(
       "SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),