diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index c1f8b2b3d9605bf57cb11a6e617b0393d5a127b2..aa335c4453dd11f6f537f576da0d00b5afc05578 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,23 +30,164 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
-  private val escapedIdentifier = "`(.+)`".r
 
+class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with BeforeAndAfterEach {
   override def afterEach(): Unit = {
     try {
       // drop all databases, tables and functions after each test
       spark.sessionState.catalog.reset()
     } finally {
-      Utils.deleteRecursively(new File("spark-warehouse"))
+      Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath))
       super.afterEach()
     }
   }
 
+  protected override def generateTable(
+      catalog: SessionCatalog,
+      name: TableIdentifier): CatalogTable = {
+    val storage =
+      CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
+    val metadata = new MetadataBuilder()
+      .putString("key", "value")
+      .build()
+    CatalogTable(
+      identifier = name,
+      tableType = CatalogTableType.EXTERNAL,
+      storage = storage,
+      schema = new StructType()
+        .add("col1", "int", nullable = true, metadata = metadata)
+        .add("col2", "string")
+        .add("a", "int")
+        .add("b", "int"),
+      provider = Some("parquet"),
+      partitionColumnNames = Seq("a", "b"),
+      createTime = 0L,
+      tracksPartitionsInCatalog = true)
+  }
+
+  test("desc table for parquet data source table using in-memory catalog") {
+    val tabName = "tab1"
+    withTable(tabName) {
+      sql(s"CREATE TABLE $tabName(a int comment 'test') USING parquet ")
+
+      checkAnswer(
+        sql(s"DESC $tabName").select("col_name", "data_type", "comment"),
+        Row("a", "int", "test")
+      )
+    }
+  }
+
+  test("alter table: set location (datasource table)") {
+    testSetLocation(isDatasourceTable = true)
+  }
+
+  test("alter table: set properties (datasource table)") {
+    testSetProperties(isDatasourceTable = true)
+  }
+
+  test("alter table: unset properties (datasource table)") {
+    testUnsetProperties(isDatasourceTable = true)
+  }
+
+  test("alter table: set serde (datasource table)") {
+    testSetSerde(isDatasourceTable = true)
+  }
+
+  test("alter table: set serde partition (datasource table)") {
+    testSetSerdePartition(isDatasourceTable = true)
+  }
+
+  test("alter table: change column (datasource table)") {
+    testChangeColumn(isDatasourceTable = true)
+  }
+
+  test("alter table: add partition (datasource table)") {
+    testAddPartitions(isDatasourceTable = true)
+  }
+
+  test("alter table: drop partition (datasource table)") {
+    testDropPartitions(isDatasourceTable = true)
+  }
+
+  test("alter table: rename partition (datasource table)") {
+    testRenamePartitions(isDatasourceTable = true)
+  }
+
+  test("drop table - data source table") {
+    testDropTable(isDatasourceTable = true)
+  }
+
+  test("create a managed Hive source table") {
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+    val tabName = "tbl"
+    withTable(tabName) {
+      val e = intercept[AnalysisException] {
+        sql(s"CREATE TABLE $tabName (i INT, j STRING)")
+      }.getMessage
+      assert(e.contains("Hive support is required to CREATE Hive TABLE"))
+    }
+  }
+
+  test("create an external Hive source table") {
+    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
+    withTempDir { tempDir =>
+      val tabName = "tbl"
+      withTable(tabName) {
+        val e = intercept[AnalysisException] {
+          sql(
+            s"""
+               |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
+               |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+               |LOCATION '${tempDir.toURI}'
+             """.stripMargin)
+        }.getMessage
+        assert(e.contains("Hive support is required to CREATE Hive TABLE"))
+      }
+    }
+  }
+
+  test("Create Hive Table As Select") {
+    import testImplicits._
+    withTable("t", "t1") {
+      var e = intercept[AnalysisException] {
+        sql("CREATE TABLE t SELECT 1 as a, 1 as b")
+      }.getMessage
+      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
+
+      spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
+      e = intercept[AnalysisException] {
+        sql("CREATE TABLE t SELECT a, b from t1")
+      }.getMessage
+      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
+    }
+  }
+
+}
+
+abstract class DDLSuite extends QueryTest with SQLTestUtils {
+
+  protected def isUsingHiveMetastore: Boolean = {
+    spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive"
+  }
+
+  protected def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable
+
+  private val escapedIdentifier = "`(.+)`".r
+
+  protected def normalizeCatalogTable(table: CatalogTable): CatalogTable = table
+
+  private def normalizeSerdeProp(props: Map[String, String]): Map[String, String] = {
+    props.filterNot(p => Seq("serialization.format", "path").contains(p._1))
+  }
+
+  private def checkCatalogTables(expected: CatalogTable, actual: CatalogTable): Unit = {
+    assert(normalizeCatalogTable(actual) == normalizeCatalogTable(expected))
+  }
+
   /**
    * Strip backticks, if any, from the string.
    */
@@ -75,33 +216,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       ignoreIfExists = false)
   }
 
-  private def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable = {
-    val storage =
-      CatalogStorageFormat(
-        locationUri = Some(catalog.defaultTablePath(name)),
-        inputFormat = None,
-        outputFormat = None,
-        serde = None,
-        compressed = false,
-        properties = Map())
-    val metadata = new MetadataBuilder()
-      .putString("key", "value")
-      .build()
-    CatalogTable(
-      identifier = name,
-      tableType = CatalogTableType.EXTERNAL,
-      storage = storage,
-      schema = new StructType()
-        .add("col1", "int", nullable = true, metadata = metadata)
-        .add("col2", "string")
-        .add("a", "int")
-        .add("b", "int"),
-      provider = Some("parquet"),
-      partitionColumnNames = Seq("a", "b"),
-      createTime = 0L,
-      tracksPartitionsInCatalog = true)
-  }
-
   private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
     catalog.createTable(generateTable(catalog, name), ignoreIfExists = false)
   }
@@ -115,6 +229,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     catalog.createPartitions(tableName, Seq(part), ignoreIfExists = false)
   }
 
+  private def getDBPath(dbName: String): URI = {
+    val warehousePath = s"file:${spark.sessionState.conf.warehousePath.stripPrefix("file:")}"
+    new Path(warehousePath, s"$dbName.db").toUri
+  }
+
   test("the qualified path of a database is stored in the catalog") {
     val catalog = spark.sessionState.catalog
 
@@ -138,11 +257,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     try {
       sql(s"CREATE DATABASE $dbName")
       val db1 = catalog.getDatabaseMetadata(dbName)
-      val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbName.db")
       assert(db1 == CatalogDatabase(
         dbName,
         "",
-        expectedLocation,
+        getDBPath(dbName),
         Map.empty))
       sql(s"DROP DATABASE $dbName CASCADE")
       assert(!catalog.databaseExists(dbName))
@@ -185,16 +303,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         val dbNameWithoutBackTicks = cleanIdentifier(dbName)
         sql(s"CREATE DATABASE $dbName")
         val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
-        val expectedLocation = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db")
         assert(db1 == CatalogDatabase(
           dbNameWithoutBackTicks,
           "",
-          expectedLocation,
+          getDBPath(dbNameWithoutBackTicks),
           Map.empty))
 
-        intercept[DatabaseAlreadyExistsException] {
+        // TODO: HiveExternalCatalog should throw DatabaseAlreadyExistsException
+        val e = intercept[AnalysisException] {
           sql(s"CREATE DATABASE $dbName")
-        }
+        }.getMessage
+        assert(e.contains(s"already exists"))
       } finally {
         catalog.reset()
       }
@@ -413,19 +532,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
-  test("desc table for parquet data source table using in-memory catalog") {
-    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
-    val tabName = "tab1"
-    withTable(tabName) {
-      sql(s"CREATE TABLE $tabName(a int comment 'test') USING parquet ")
-
-      checkAnswer(
-        sql(s"DESC $tabName").select("col_name", "data_type", "comment"),
-        Row("a", "int", "test")
-      )
-    }
-  }
-
   test("Alter/Describe Database") {
     val catalog = spark.sessionState.catalog
     val databaseNames = Seq("db1", "`database`")
@@ -433,7 +539,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     databaseNames.foreach { dbName =>
       try {
         val dbNameWithoutBackTicks = cleanIdentifier(dbName)
-        val location = makeQualifiedPath(s"spark-warehouse/$dbNameWithoutBackTicks.db")
+        val location = getDBPath(dbNameWithoutBackTicks)
 
         sql(s"CREATE DATABASE $dbName")
 
@@ -477,7 +583,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       var message = intercept[AnalysisException] {
         sql(s"DROP DATABASE $dbName")
       }.getMessage
-      assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
+      // TODO: Unify the exception.
+      if (isUsingHiveMetastore) {
+        assert(message.contains(s"NoSuchObjectException: $dbNameWithoutBackTicks"))
+      } else {
+        assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found"))
+      }
 
       message = intercept[AnalysisException] {
         sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
@@ -506,7 +617,12 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     val message = intercept[AnalysisException] {
       sql(s"DROP DATABASE $dbName RESTRICT")
     }.getMessage
-    assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
+    // TODO: Unify the exception.
+    if (isUsingHiveMetastore) {
+      assert(message.contains(s"Database $dbName is not empty. One or more tables exist"))
+    } else {
+      assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
+    }
 
     catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false)
 
@@ -537,7 +653,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     createTable(catalog, tableIdent1)
     val expectedTableIdent = tableIdent1.copy(database = Some("default"))
     val expectedTable = generateTable(catalog, expectedTableIdent)
-    assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+    checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
   }
 
   test("create table in a specific db") {
@@ -546,7 +662,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
     createTable(catalog, tableIdent1)
     val expectedTable = generateTable(catalog, tableIdent1)
-    assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
+    checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
   }
 
   test("create table using") {
@@ -731,52 +847,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     testSetLocation(isDatasourceTable = false)
   }
 
-  test("alter table: set location (datasource table)") {
-    testSetLocation(isDatasourceTable = true)
-  }
-
   test("alter table: set properties") {
     testSetProperties(isDatasourceTable = false)
   }
 
-  test("alter table: set properties (datasource table)") {
-    testSetProperties(isDatasourceTable = true)
-  }
-
   test("alter table: unset properties") {
     testUnsetProperties(isDatasourceTable = false)
   }
 
-  test("alter table: unset properties (datasource table)") {
-    testUnsetProperties(isDatasourceTable = true)
-  }
-
   // TODO: move this test to HiveDDLSuite.scala
   ignore("alter table: set serde") {
     testSetSerde(isDatasourceTable = false)
   }
 
-  test("alter table: set serde (datasource table)") {
-    testSetSerde(isDatasourceTable = true)
-  }
-
   // TODO: move this test to HiveDDLSuite.scala
   ignore("alter table: set serde partition") {
     testSetSerdePartition(isDatasourceTable = false)
   }
 
-  test("alter table: set serde partition (datasource table)") {
-    testSetSerdePartition(isDatasourceTable = true)
-  }
-
   test("alter table: change column") {
     testChangeColumn(isDatasourceTable = false)
   }
 
-  test("alter table: change column (datasource table)") {
-    testChangeColumn(isDatasourceTable = true)
-  }
-
   test("alter table: bucketing is not supported") {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -805,10 +897,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     testAddPartitions(isDatasourceTable = false)
   }
 
-  test("alter table: add partition (datasource table)") {
-    testAddPartitions(isDatasourceTable = true)
-  }
-
   test("alter table: recover partitions (sequential)") {
     withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
       testRecoverPartitions()
@@ -821,7 +909,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
-  private def testRecoverPartitions() {
+  protected def testRecoverPartitions() {
     val catalog = spark.sessionState.catalog
     // table to alter does not exist
     intercept[AnalysisException] {
@@ -860,8 +948,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       sql("ALTER TABLE tab1 RECOVER PARTITIONS")
       assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
         Set(part1, part2))
-      assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
-      assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+      if (!isUsingHiveMetastore) {
+        assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
+        assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+      } else {
+        // After ALTER TABLE, the statistics of the first partition is removed by Hive megastore
+        assert(catalog.getPartition(tableIdent, part1).parameters.get("numFiles").isEmpty)
+        assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")
+      }
     } finally {
       fs.delete(root, true)
     }
@@ -875,10 +969,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     testDropPartitions(isDatasourceTable = false)
   }
 
-  test("alter table: drop partition (datasource table)") {
-    testDropPartitions(isDatasourceTable = true)
-  }
-
   test("alter table: drop partition is not supported for views") {
     assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')")
   }
@@ -887,10 +977,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     testRenamePartitions(isDatasourceTable = false)
   }
 
-  test("alter table: rename partition (datasource table)") {
-    testRenamePartitions(isDatasourceTable = true)
-  }
-
   test("show table extended") {
     withTempView("show1a", "show2b") {
       sql(
@@ -971,11 +1057,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     testDropTable(isDatasourceTable = false)
   }
 
-  test("drop table - data source table") {
-    testDropTable(isDatasourceTable = true)
-  }
-
-  private def testDropTable(isDatasourceTable: Boolean): Unit = {
+  protected def testDropTable(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
@@ -1011,9 +1093,10 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       tableIdent: TableIdentifier): Unit = {
     catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
       provider = Some("csv")))
+    assert(catalog.getTableMetadata(tableIdent).provider == Some("csv"))
   }
 
-  private def testSetProperties(isDatasourceTable: Boolean): Unit = {
+  protected def testSetProperties(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
@@ -1022,7 +1105,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       convertToDatasourceTable(catalog, tableIdent)
     }
     def getProps: Map[String, String] = {
-      catalog.getTableMetadata(tableIdent).properties
+      if (isUsingHiveMetastore) {
+        normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
+      } else {
+        catalog.getTableMetadata(tableIdent).properties
+      }
     }
     assert(getProps.isEmpty)
     // set table properties
@@ -1038,7 +1125,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
-  private def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
+  protected def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
@@ -1047,7 +1134,11 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       convertToDatasourceTable(catalog, tableIdent)
     }
     def getProps: Map[String, String] = {
-      catalog.getTableMetadata(tableIdent).properties
+      if (isUsingHiveMetastore) {
+        normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
+      } else {
+        catalog.getTableMetadata(tableIdent).properties
+      }
     }
     // unset table properties
     sql("ALTER TABLE dbx.tab1 SET TBLPROPERTIES ('j' = 'am', 'p' = 'an', 'c' = 'lan', 'x' = 'y')")
@@ -1071,7 +1162,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     assert(getProps == Map("x" -> "y"))
   }
 
-  private def testSetLocation(isDatasourceTable: Boolean): Unit = {
+  protected def testSetLocation(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val partSpec = Map("a" -> "1", "b" -> "2")
@@ -1082,24 +1173,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       convertToDatasourceTable(catalog, tableIdent)
     }
     assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined)
-    assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
+    assert(normalizeSerdeProp(catalog.getTableMetadata(tableIdent).storage.properties).isEmpty)
     assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined)
-    assert(catalog.getPartition(tableIdent, partSpec).storage.properties.isEmpty)
+    assert(
+      normalizeSerdeProp(catalog.getPartition(tableIdent, partSpec).storage.properties).isEmpty)
+
     // Verify that the location is set to the expected string
     def verifyLocation(expected: URI, spec: Option[TablePartitionSpec] = None): Unit = {
       val storageFormat = spec
         .map { s => catalog.getPartition(tableIdent, s).storage }
         .getOrElse { catalog.getTableMetadata(tableIdent).storage }
-      if (isDatasourceTable) {
-        if (spec.isDefined) {
-          assert(storageFormat.properties.isEmpty)
-          assert(storageFormat.locationUri === Some(expected))
-        } else {
-          assert(storageFormat.locationUri === Some(expected))
-        }
-      } else {
-        assert(storageFormat.locationUri === Some(expected))
-      }
+      // TODO(gatorsmile): fix the bug in alter table set location.
+      // if (isUsingHiveMetastore) {
+      //  assert(storageFormat.properties.get("path") === expected)
+      // }
+      assert(storageFormat.locationUri === Some(expected))
     }
     // set table location
     sql("ALTER TABLE dbx.tab1 SET LOCATION '/path/to/your/lovely/heart'")
@@ -1124,7 +1212,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
-  private def testSetSerde(isDatasourceTable: Boolean): Unit = {
+  protected def testSetSerde(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
@@ -1132,8 +1220,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     if (isDatasourceTable) {
       convertToDatasourceTable(catalog, tableIdent)
     }
-    assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
-    assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
+    def checkSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
+      val serdeProp = catalog.getTableMetadata(tableIdent).storage.properties
+      if (isUsingHiveMetastore) {
+        assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
+      } else {
+        assert(serdeProp == expectedSerdeProps)
+      }
+    }
+    if (isUsingHiveMetastore) {
+      assert(catalog.getTableMetadata(tableIdent).storage.serde ==
+        Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+    } else {
+      assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
+    }
+    checkSerdeProps(Map.empty[String, String])
     // set table serde and/or properties (should fail on datasource tables)
     if (isDatasourceTable) {
       val e1 = intercept[AnalysisException] {
@@ -1146,31 +1247,30 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       assert(e1.getMessage.contains("datasource"))
       assert(e2.getMessage.contains("datasource"))
     } else {
-      sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.jadoop'")
-      assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.jadoop"))
-      assert(catalog.getTableMetadata(tableIdent).storage.properties.isEmpty)
-      sql("ALTER TABLE dbx.tab1 SET SERDE 'org.apache.madoop' " +
+      val newSerde = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+      sql(s"ALTER TABLE dbx.tab1 SET SERDE '$newSerde'")
+      assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(newSerde))
+      checkSerdeProps(Map.empty[String, String])
+      val serde2 = "org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe"
+      sql(s"ALTER TABLE dbx.tab1 SET SERDE '$serde2' " +
         "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
-      assert(catalog.getTableMetadata(tableIdent).storage.serde == Some("org.apache.madoop"))
-      assert(catalog.getTableMetadata(tableIdent).storage.properties ==
-        Map("k" -> "v", "kay" -> "vee"))
+      assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(serde2))
+      checkSerdeProps(Map("k" -> "v", "kay" -> "vee"))
     }
     // set serde properties only
     sql("ALTER TABLE dbx.tab1 SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
-    assert(catalog.getTableMetadata(tableIdent).storage.properties ==
-      Map("k" -> "vvv", "kay" -> "vee"))
+    checkSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
     // set things without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     sql("ALTER TABLE tab1 SET SERDEPROPERTIES ('kay' = 'veee')")
-    assert(catalog.getTableMetadata(tableIdent).storage.properties ==
-      Map("k" -> "vvv", "kay" -> "veee"))
+    checkSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
     // table to alter does not exist
     intercept[AnalysisException] {
       sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
     }
   }
 
-  private def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
+  protected def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val spec = Map("a" -> "1", "b" -> "2")
@@ -1183,8 +1283,21 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     if (isDatasourceTable) {
       convertToDatasourceTable(catalog, tableIdent)
     }
-    assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
-    assert(catalog.getPartition(tableIdent, spec).storage.properties.isEmpty)
+    def checkPartitionSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
+      val serdeProp = catalog.getPartition(tableIdent, spec).storage.properties
+      if (isUsingHiveMetastore) {
+        assert(normalizeSerdeProp(serdeProp) == expectedSerdeProps)
+      } else {
+        assert(serdeProp == expectedSerdeProps)
+      }
+    }
+    if (isUsingHiveMetastore) {
+      assert(catalog.getPartition(tableIdent, spec).storage.serde ==
+        Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+    } else {
+      assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
+    }
+    checkPartitionSerdeProps(Map.empty[String, String])
     // set table serde and/or properties (should fail on datasource tables)
     if (isDatasourceTable) {
       val e1 = intercept[AnalysisException] {
@@ -1199,26 +1312,23 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     } else {
       sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.jadoop'")
       assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.jadoop"))
-      assert(catalog.getPartition(tableIdent, spec).storage.properties.isEmpty)
+      checkPartitionSerdeProps(Map.empty[String, String])
       sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'org.apache.madoop' " +
         "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
       assert(catalog.getPartition(tableIdent, spec).storage.serde == Some("org.apache.madoop"))
-      assert(catalog.getPartition(tableIdent, spec).storage.properties ==
-        Map("k" -> "v", "kay" -> "vee"))
+      checkPartitionSerdeProps(Map("k" -> "v", "kay" -> "vee"))
     }
     // set serde properties only
     maybeWrapException(isDatasourceTable) {
       sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " +
         "SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
-      assert(catalog.getPartition(tableIdent, spec).storage.properties ==
-        Map("k" -> "vvv", "kay" -> "vee"))
+      checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "vee"))
     }
     // set things without explicitly specifying database
     catalog.setCurrentDatabase("dbx")
     maybeWrapException(isDatasourceTable) {
       sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 'veee')")
-      assert(catalog.getPartition(tableIdent, spec).storage.properties ==
-        Map("k" -> "vvv", "kay" -> "veee"))
+      checkPartitionSerdeProps(Map("k" -> "vvv", "kay" -> "veee"))
     }
     // table to alter does not exist
     intercept[AnalysisException] {
@@ -1226,7 +1336,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
-  private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
+  protected def testAddPartitions(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val part1 = Map("a" -> "1", "b" -> "5")
@@ -1247,7 +1357,15 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')")
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
     assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined)
-    assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(new URI("paris")))
+    val partitionLocation = if (isUsingHiveMetastore) {
+      val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri
+      assert(tableLocation.isDefined)
+      makeQualifiedPath(new Path(tableLocation.get.toString, "paris"))
+    } else {
+      new URI("paris")
+    }
+
+    assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(partitionLocation))
     assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined)
 
     // add partitions without explicitly specifying database
@@ -1277,7 +1395,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       Set(part1, part2, part3, part4, part5))
   }
 
-  private def testDropPartitions(isDatasourceTable: Boolean): Unit = {
+  protected def testDropPartitions(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val part1 = Map("a" -> "1", "b" -> "5")
@@ -1330,7 +1448,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     assert(catalog.listPartitions(tableIdent).isEmpty)
   }
 
-  private def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
+  protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val part1 = Map("a" -> "1", "b" -> "q")
@@ -1374,7 +1492,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       Set(Map("a" -> "1", "b" -> "p"), Map("a" -> "20", "b" -> "c"), Map("a" -> "3", "b" -> "p")))
   }
 
-  private def testChangeColumn(isDatasourceTable: Boolean): Unit = {
+  protected def testChangeColumn(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val resolver = spark.sessionState.conf.resolver
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -1474,35 +1592,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     )
   }
 
-  test("create a managed Hive source table") {
-    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
-    val tabName = "tbl"
-    withTable(tabName) {
-      val e = intercept[AnalysisException] {
-        sql(s"CREATE TABLE $tabName (i INT, j STRING)")
-      }.getMessage
-      assert(e.contains("Hive support is required to CREATE Hive TABLE"))
-    }
-  }
-
-  test("create an external Hive source table") {
-    assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
-    withTempDir { tempDir =>
-      val tabName = "tbl"
-      withTable(tabName) {
-        val e = intercept[AnalysisException] {
-          sql(
-            s"""
-               |CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
-               |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
-               |LOCATION '${tempDir.toURI}'
-           """.stripMargin)
-        }.getMessage
-        assert(e.contains("Hive support is required to CREATE Hive TABLE"))
-      }
-    }
-  }
-
   test("create a data source table without schema") {
     import testImplicits._
     withTempPath { tempDir =>
@@ -1541,22 +1630,6 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
-  test("Create Hive Table As Select") {
-    import testImplicits._
-    withTable("t", "t1") {
-      var e = intercept[AnalysisException] {
-        sql("CREATE TABLE t SELECT 1 as a, 1 as b")
-      }.getMessage
-      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
-
-      spark.range(1).select('id as 'a, 'id as 'b).write.saveAsTable("t1")
-      e = intercept[AnalysisException] {
-        sql("CREATE TABLE t SELECT a, b from t1")
-      }.getMessage
-      assert(e.contains("Hive support is required to CREATE Hive TABLE (AS SELECT)"))
-    }
-  }
-
   test("Create Data Source Table As Select") {
     import testImplicits._
     withTable("t", "t1", "t2") {
@@ -1580,7 +1653,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
   }
 
   test("drop default database") {
-    Seq("true", "false").foreach { caseSensitive =>
+    val caseSensitiveOptions = if (isUsingHiveMetastore) Seq("false") else Seq("true", "false")
+    caseSensitiveOptions.foreach { caseSensitive =>
       withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive) {
         var message = intercept[AnalysisException] {
           sql("DROP DATABASE default")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 9201954b66d10cfcf5ff9cdb366e23ba86a4a3a4..12fc8993d7396bab9c67c4e205a72263fc7296b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -306,6 +306,11 @@ private[sql] trait SQLTestUtils
     val fs = hadoopPath.getFileSystem(spark.sessionState.newHadoopConf())
     fs.makeQualified(hadoopPath).toUri
   }
+
+  def makeQualifiedPath(path: Path): URI = {
+    val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
+    fs.makeQualified(path).toUri
+  }
 }
 
 private[sql] object SQLTestUtils {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 10d929a4a0ef8f8ad8e35a7d9ee81fd4762164ad..fce055048d72fd0625e32309e6e67e3eb46a1623 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -27,16 +27,88 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
-import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.orc.OrcFileOperator
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+
+// TODO(gatorsmile): combine HiveCatalogedDDLSuite and HiveDDLSuite
+class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeAndAfterEach {
+  override def afterEach(): Unit = {
+    try {
+      // drop all databases, tables and functions after each test
+      spark.sessionState.catalog.reset()
+    } finally {
+      super.afterEach()
+    }
+  }
+
+  protected override def generateTable(
+      catalog: SessionCatalog,
+      name: TableIdentifier): CatalogTable = {
+    val storage =
+      CatalogStorageFormat(
+        locationUri = Some(catalog.defaultTablePath(name)),
+        inputFormat = Some("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+        outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"),
+        serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+        compressed = false,
+        properties = Map("serialization.format" -> "1"))
+    val metadata = new MetadataBuilder()
+      .putString("key", "value")
+      .build()
+    CatalogTable(
+      identifier = name,
+      tableType = CatalogTableType.EXTERNAL,
+      storage = storage,
+      schema = new StructType()
+        .add("col1", "int", nullable = true, metadata = metadata)
+        .add("col2", "string")
+        .add("a", "int")
+        .add("b", "int"),
+      provider = Some("hive"),
+      partitionColumnNames = Seq("a", "b"),
+      createTime = 0L,
+      tracksPartitionsInCatalog = true)
+  }
+
+  protected override def normalizeCatalogTable(table: CatalogTable): CatalogTable = {
+    val nondeterministicProps = Set(
+      "CreateTime",
+      "transient_lastDdlTime",
+      "grantTime",
+      "lastUpdateTime",
+      "last_modified_by",
+      "last_modified_time",
+      "Owner:",
+      "COLUMN_STATS_ACCURATE",
+      // The following are hive specific schema parameters which we do not need to match exactly.
+      "numFiles",
+      "numRows",
+      "rawDataSize",
+      "totalSize",
+      "totalNumberFiles",
+      "maxFileSize",
+      "minFileSize"
+    )
+
+    table.copy(
+      createTime = 0L,
+      lastAccessTime = 0L,
+      owner = "",
+      properties = table.properties.filterKeys(!nondeterministicProps.contains(_)),
+      // View texts are checked separately
+      viewText = None
+    )
+  }
+
+}
 
 class HiveDDLSuite
   extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
@@ -1719,61 +1791,6 @@ class HiveDDLSuite
     }
   }
 
-  Seq("a b", "a:b", "a%b").foreach { specialChars =>
-    test(s"datasource table: location uri contains $specialChars") {
-      withTable("t", "t1") {
-        withTempDir { dir =>
-          val loc = new File(dir, specialChars)
-          loc.mkdir()
-          spark.sql(
-            s"""
-               |CREATE TABLE t(a string)
-               |USING parquet
-               |LOCATION '$loc'
-             """.stripMargin)
-
-          val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-          assert(table.location == new Path(loc.getAbsolutePath).toUri)
-          assert(new Path(table.location).toString.contains(specialChars))
-
-          assert(loc.listFiles().isEmpty)
-          spark.sql("INSERT INTO TABLE t SELECT 1")
-          assert(loc.listFiles().length >= 1)
-          checkAnswer(spark.table("t"), Row("1") :: Nil)
-        }
-
-        withTempDir { dir =>
-          val loc = new File(dir, specialChars)
-          loc.mkdir()
-          spark.sql(
-            s"""
-               |CREATE TABLE t1(a string, b string)
-               |USING parquet
-               |PARTITIONED BY(b)
-               |LOCATION '$loc'
-             """.stripMargin)
-
-          val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
-          assert(table.location == new Path(loc.getAbsolutePath).toUri)
-          assert(new Path(table.location).toString.contains(specialChars))
-
-          assert(loc.listFiles().isEmpty)
-          spark.sql("INSERT INTO TABLE t1 PARTITION(b=2) SELECT 1")
-          val partFile = new File(loc, "b=2")
-          assert(partFile.listFiles().length >= 1)
-          checkAnswer(spark.table("t1"), Row("1", "2") :: Nil)
-
-          spark.sql("INSERT INTO TABLE t1 PARTITION(b='2017-03-03 12:13%3A14') SELECT 1")
-          val partFile1 = new File(loc, "b=2017-03-03 12:13%3A14")
-          assert(!partFile1.exists())
-          val partFile2 = new File(loc, "b=2017-03-03 12%3A13%253A14")
-          assert(partFile2.listFiles().length >= 1)
-          checkAnswer(spark.table("t1"), Row("1", "2") :: Row("1", "2017-03-03 12:13%3A14") :: Nil)
-        }
-      }
-    }
-  }
-
   Seq("a b", "a:b", "a%b").foreach { specialChars =>
     test(s"hive table: location uri contains $specialChars") {
       withTable("t") {
@@ -1848,28 +1865,4 @@ class HiveDDLSuite
       }
     }
   }
-
-  Seq("a b", "a:b", "a%b").foreach { specialChars =>
-    test(s"location uri contains $specialChars for database") {
-      try {
-        withTable("t") {
-          withTempDir { dir =>
-            val loc = new File(dir, specialChars)
-            spark.sql(s"CREATE DATABASE tmpdb LOCATION '$loc'")
-            spark.sql("USE tmpdb")
-
-            Seq(1).toDF("a").write.saveAsTable("t")
-            val tblloc = new File(loc, "t")
-            val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
-            val tblPath = new Path(tblloc.getAbsolutePath)
-            val fs = tblPath.getFileSystem(spark.sessionState.newHadoopConf())
-            assert(table.location == makeQualifiedPath(tblloc.getAbsolutePath))
-            assert(tblloc.listFiles().nonEmpty)
-          }
-        }
-      } finally {
-        spark.sql("DROP DATABASE IF EXISTS tmpdb")
-      }
-    }
-  }
 }