From b1e639ab09d3a7a1545119e45a505c9a04308353 Mon Sep 17 00:00:00 2001
From: Xiao Li <gatorsmile@gmail.com>
Date: Tue, 2 May 2017 16:49:24 +0800
Subject: [PATCH] [SPARK-19235][SQL][TEST][FOLLOW-UP] Enable Test Cases in
 DDLSuite with Hive Metastore

### What changes were proposed in this pull request?
This is a follow-up of enabling test cases in DDLSuite with Hive Metastore. It consists of the following remaining tasks:
- Run all the `alter table` and `drop table` DDL tests against data source tables when using Hive metastore.
- Do not run any `alter table` and `drop table` DDL test against Hive serde tables when using InMemoryCatalog.
- Reenable `alter table: set serde partition` and `alter table: set serde` tests for Hive serde tables.

### How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #17524 from gatorsmile/cleanupDDLSuite.
---
 .../sql/execution/command/DDLSuite.scala      | 291 ++++++++----------
 .../apache/spark/sql/test/SQLTestUtils.scala  |   3 +-
 .../sql/hive/execution/HiveDDLSuite.scala     |  73 ++++-
 3 files changed, 195 insertions(+), 172 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 2f4eb1b155..0abcff7606 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -49,7 +49,8 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
 
   protected override def generateTable(
       catalog: SessionCatalog,
-      name: TableIdentifier): CatalogTable = {
+      name: TableIdentifier,
+      isDataSource: Boolean = true): CatalogTable = {
     val storage =
       CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
     val metadata = new MetadataBuilder()
@@ -70,46 +71,6 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
       tracksPartitionsInCatalog = true)
   }
 
-  test("alter table: set location (datasource table)") {
-    testSetLocation(isDatasourceTable = true)
-  }
-
-  test("alter table: set properties (datasource table)") {
-    testSetProperties(isDatasourceTable = true)
-  }
-
-  test("alter table: unset properties (datasource table)") {
-    testUnsetProperties(isDatasourceTable = true)
-  }
-
-  test("alter table: set serde (datasource table)") {
-    testSetSerde(isDatasourceTable = true)
-  }
-
-  test("alter table: set serde partition (datasource table)") {
-    testSetSerdePartition(isDatasourceTable = true)
-  }
-
-  test("alter table: change column (datasource table)") {
-    testChangeColumn(isDatasourceTable = true)
-  }
-
-  test("alter table: add partition (datasource table)") {
-    testAddPartitions(isDatasourceTable = true)
-  }
-
-  test("alter table: drop partition (datasource table)") {
-    testDropPartitions(isDatasourceTable = true)
-  }
-
-  test("alter table: rename partition (datasource table)") {
-    testRenamePartitions(isDatasourceTable = true)
-  }
-
-  test("drop table - data source table") {
-    testDropTable(isDatasourceTable = true)
-  }
-
   test("create a managed Hive source table") {
     assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
     val tabName = "tbl"
@@ -163,7 +124,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive"
   }
 
-  protected def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable
+  protected def generateTable(
+      catalog: SessionCatalog,
+      name: TableIdentifier,
+      isDataSource: Boolean = true): CatalogTable
 
   private val escapedIdentifier = "`(.+)`".r
 
@@ -205,8 +169,11 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       ignoreIfExists = false)
   }
 
-  private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = {
-    catalog.createTable(generateTable(catalog, name), ignoreIfExists = false)
+  private def createTable(
+      catalog: SessionCatalog,
+      name: TableIdentifier,
+      isDataSource: Boolean = true): Unit = {
+    catalog.createTable(generateTable(catalog, name, isDataSource), ignoreIfExists = false)
   }
 
   private def createTablePartition(
@@ -223,6 +190,46 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     new Path(CatalogUtils.URIToString(warehousePath), s"$dbName.db").toUri
   }
 
+  test("alter table: set location (datasource table)") {
+    testSetLocation(isDatasourceTable = true)
+  }
+
+  test("alter table: set properties (datasource table)") {
+    testSetProperties(isDatasourceTable = true)
+  }
+
+  test("alter table: unset properties (datasource table)") {
+    testUnsetProperties(isDatasourceTable = true)
+  }
+
+  test("alter table: set serde (datasource table)") {
+    testSetSerde(isDatasourceTable = true)
+  }
+
+  test("alter table: set serde partition (datasource table)") {
+    testSetSerdePartition(isDatasourceTable = true)
+  }
+
+  test("alter table: change column (datasource table)") {
+    testChangeColumn(isDatasourceTable = true)
+  }
+
+  test("alter table: add partition (datasource table)") {
+    testAddPartitions(isDatasourceTable = true)
+  }
+
+  test("alter table: drop partition (datasource table)") {
+    testDropPartitions(isDatasourceTable = true)
+  }
+
+  test("alter table: rename partition (datasource table)") {
+    testRenamePartitions(isDatasourceTable = true)
+  }
+
+  test("drop table - data source table") {
+    testDropTable(isDatasourceTable = true)
+  }
+
   test("the qualified path of a database is stored in the catalog") {
     val catalog = spark.sessionState.catalog
 
@@ -835,32 +842,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     }
   }
 
-  test("alter table: set location") {
-    testSetLocation(isDatasourceTable = false)
-  }
-
-  test("alter table: set properties") {
-    testSetProperties(isDatasourceTable = false)
-  }
-
-  test("alter table: unset properties") {
-    testUnsetProperties(isDatasourceTable = false)
-  }
-
-  // TODO: move this test to HiveDDLSuite.scala
-  ignore("alter table: set serde") {
-    testSetSerde(isDatasourceTable = false)
-  }
-
-  // TODO: move this test to HiveDDLSuite.scala
-  ignore("alter table: set serde partition") {
-    testSetSerdePartition(isDatasourceTable = false)
-  }
-
-  test("alter table: change column") {
-    testChangeColumn(isDatasourceTable = false)
-  }
-
   test("alter table: bucketing is not supported") {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -885,10 +866,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     assertUnsupported("ALTER TABLE dbx.tab1 NOT STORED AS DIRECTORIES")
   }
 
-  test("alter table: add partition") {
-    testAddPartitions(isDatasourceTable = false)
-  }
-
   test("alter table: recover partitions (sequential)") {
     withSQLConf("spark.rdd.parallelListingThreshold" -> "10") {
       testRecoverPartitions()
@@ -957,17 +934,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     assertUnsupported("ALTER VIEW dbx.tab1 ADD IF NOT EXISTS PARTITION (b='2')")
   }
 
-  test("alter table: drop partition") {
-    testDropPartitions(isDatasourceTable = false)
-  }
-
   test("alter table: drop partition is not supported for views") {
     assertUnsupported("ALTER VIEW dbx.tab1 DROP IF EXISTS PARTITION (b='2')")
   }
 
-  test("alter table: rename partition") {
-    testRenamePartitions(isDatasourceTable = false)
-  }
 
   test("show databases") {
     sql("CREATE DATABASE showdb2B")
@@ -1011,18 +981,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     assert(catalog.listTables("default") == Nil)
   }
 
-  test("drop table") {
-    testDropTable(isDatasourceTable = false)
-  }
-
   protected def testDropTable(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
+    createTable(catalog, tableIdent, isDatasourceTable)
     assert(catalog.listTables("dbx") == Seq(tableIdent))
     sql("DROP TABLE dbx.tab1")
     assert(catalog.listTables("dbx") == Nil)
@@ -1046,22 +1012,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       e.getMessage.contains("Cannot drop a table with DROP VIEW. Please use DROP TABLE instead"))
   }
 
-  private def convertToDatasourceTable(
-      catalog: SessionCatalog,
-      tableIdent: TableIdentifier): Unit = {
-    catalog.alterTable(catalog.getTableMetadata(tableIdent).copy(
-      provider = Some("csv")))
-    assert(catalog.getTableMetadata(tableIdent).provider == Some("csv"))
-  }
-
   protected def testSetProperties(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
+    createTable(catalog, tableIdent, isDatasourceTable)
     def getProps: Map[String, String] = {
       if (isUsingHiveMetastore) {
         normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
@@ -1084,13 +1042,13 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   protected def testUnsetProperties(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
+    createTable(catalog, tableIdent, isDatasourceTable)
     def getProps: Map[String, String] = {
       if (isUsingHiveMetastore) {
         normalizeCatalogTable(catalog.getTableMetadata(tableIdent)).properties
@@ -1121,15 +1079,15 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   protected def testSetLocation(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val partSpec = Map("a" -> "1", "b" -> "2")
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
+    createTable(catalog, tableIdent, isDatasourceTable)
     createTablePartition(catalog, partSpec, tableIdent)
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
     assert(catalog.getTableMetadata(tableIdent).storage.locationUri.isDefined)
     assert(normalizeSerdeProp(catalog.getTableMetadata(tableIdent).storage.properties).isEmpty)
     assert(catalog.getPartition(tableIdent, partSpec).storage.locationUri.isDefined)
@@ -1171,13 +1129,13 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   protected def testSetSerde(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
+    createTable(catalog, tableIdent, isDatasourceTable)
     def checkSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
       val serdeProp = catalog.getTableMetadata(tableIdent).storage.properties
       if (isUsingHiveMetastore) {
@@ -1187,8 +1145,12 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       }
     }
     if (isUsingHiveMetastore) {
-      assert(catalog.getTableMetadata(tableIdent).storage.serde ==
-        Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+      val expectedSerde = if (isDatasourceTable) {
+        "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+      } else {
+        "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
+      }
+      assert(catalog.getTableMetadata(tableIdent).storage.serde == Some(expectedSerde))
     } else {
       assert(catalog.getTableMetadata(tableIdent).storage.serde.isEmpty)
     }
@@ -1229,18 +1191,18 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   protected def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val spec = Map("a" -> "1", "b" -> "2")
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
+    createTable(catalog, tableIdent, isDatasourceTable)
     createTablePartition(catalog, spec, tableIdent)
     createTablePartition(catalog, Map("a" -> "1", "b" -> "3"), tableIdent)
     createTablePartition(catalog, Map("a" -> "2", "b" -> "2"), tableIdent)
     createTablePartition(catalog, Map("a" -> "2", "b" -> "3"), tableIdent)
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
     def checkPartitionSerdeProps(expectedSerdeProps: Map[String, String]): Unit = {
       val serdeProp = catalog.getPartition(tableIdent, spec).storage.properties
       if (isUsingHiveMetastore) {
@@ -1250,8 +1212,12 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
       }
     }
     if (isUsingHiveMetastore) {
-      assert(catalog.getPartition(tableIdent, spec).storage.serde ==
-        Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+      val expectedSerde = if (isDatasourceTable) {
+        "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+      } else {
+        "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"
+      }
+      assert(catalog.getPartition(tableIdent, spec).storage.serde == Some(expectedSerde))
     } else {
       assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
     }
@@ -1295,6 +1261,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   protected def testAddPartitions(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val part1 = Map("a" -> "1", "b" -> "5")
@@ -1303,11 +1272,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     val part4 = Map("a" -> "4", "b" -> "8")
     val part5 = Map("a" -> "9", "b" -> "9")
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
+    createTable(catalog, tableIdent, isDatasourceTable)
     createTablePartition(catalog, part1, tableIdent)
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
 
     // basic add partition
@@ -1354,6 +1320,9 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   protected def testDropPartitions(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val part1 = Map("a" -> "1", "b" -> "5")
@@ -1362,7 +1331,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     val part4 = Map("a" -> "4", "b" -> "8")
     val part5 = Map("a" -> "9", "b" -> "9")
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
+    createTable(catalog, tableIdent, isDatasourceTable)
     createTablePartition(catalog, part1, tableIdent)
     createTablePartition(catalog, part2, tableIdent)
     createTablePartition(catalog, part3, tableIdent)
@@ -1370,9 +1339,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
     createTablePartition(catalog, part5, tableIdent)
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
       Set(part1, part2, part3, part4, part5))
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
 
     // basic drop partition
     sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
@@ -1407,20 +1373,20 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   protected def testRenamePartitions(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     val part1 = Map("a" -> "1", "b" -> "q")
     val part2 = Map("a" -> "2", "b" -> "c")
     val part3 = Map("a" -> "3", "b" -> "p")
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
+    createTable(catalog, tableIdent, isDatasourceTable)
     createTablePartition(catalog, part1, tableIdent)
     createTablePartition(catalog, part2, tableIdent)
     createTablePartition(catalog, part3, tableIdent)
     assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3))
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
 
     // basic rename partition
     sql("ALTER TABLE dbx.tab1 PARTITION (a='1', b='q') RENAME TO PARTITION (a='100', b='p')")
@@ -1451,14 +1417,14 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   protected def testChangeColumn(isDatasourceTable: Boolean): Unit = {
+    if (!isUsingHiveMetastore) {
+      assert(isDatasourceTable, "InMemoryCatalog only supports data source tables")
+    }
     val catalog = spark.sessionState.catalog
     val resolver = spark.sessionState.conf.resolver
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
     createDatabase(catalog, "dbx")
-    createTable(catalog, tableIdent)
-    if (isDatasourceTable) {
-      convertToDatasourceTable(catalog, tableIdent)
-    }
+    createTable(catalog, tableIdent, isDatasourceTable)
     def getMetadata(colName: String): Metadata = {
       val column = catalog.getTableMetadata(tableIdent).schema.fields.find { field =>
         resolver(field.name, colName)
@@ -1601,13 +1567,15 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
   }
 
   test("drop current database") {
-    sql("CREATE DATABASE temp")
-    sql("USE temp")
-    sql("DROP DATABASE temp")
-    val e = intercept[AnalysisException] {
+    withDatabase("temp") {
+      sql("CREATE DATABASE temp")
+      sql("USE temp")
+      sql("DROP DATABASE temp")
+      val e = intercept[AnalysisException] {
         sql("CREATE TABLE t (a INT, b INT) USING parquet")
       }.getMessage
-    assert(e.contains("Database 'temp' not found"))
+      assert(e.contains("Database 'temp' not found"))
+    }
   }
 
   test("drop default database") {
@@ -1837,22 +1805,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
         checkAnswer(spark.table("tbl"), Row(1))
         val defaultTablePath = spark.sessionState.catalog
           .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get
-
-        sql(s"ALTER TABLE tbl SET LOCATION '${dir.toURI}'")
-        spark.catalog.refreshTable("tbl")
-        // SET LOCATION won't move data from previous table path to new table path.
-        assert(spark.table("tbl").count() == 0)
-        // the previous table path should be still there.
-        assert(new File(defaultTablePath).exists())
-
-        sql("INSERT INTO tbl SELECT 2")
-        checkAnswer(spark.table("tbl"), Row(2))
-        // newly inserted data will go to the new table path.
-        assert(dir.listFiles().nonEmpty)
-
-        sql("DROP TABLE tbl")
-        // the new table path will be removed after DROP TABLE.
-        assert(!dir.exists())
+        try {
+          sql(s"ALTER TABLE tbl SET LOCATION '${dir.toURI}'")
+          spark.catalog.refreshTable("tbl")
+          // SET LOCATION won't move data from previous table path to new table path.
+          assert(spark.table("tbl").count() == 0)
+          // the previous table path should be still there.
+          assert(new File(defaultTablePath).exists())
+
+          sql("INSERT INTO tbl SELECT 2")
+          checkAnswer(spark.table("tbl"), Row(2))
+          // newly inserted data will go to the new table path.
+          assert(dir.listFiles().nonEmpty)
+
+          sql("DROP TABLE tbl")
+          // the new table path will be removed after DROP TABLE.
+          assert(!dir.exists())
+        } finally {
+          Utils.deleteRecursively(new File(defaultTablePath))
+        }
       }
     }
   }
@@ -2125,7 +2096,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
 
   Seq("a b", "a:b", "a%b").foreach { specialChars =>
     test(s"location uri contains $specialChars for database") {
-      try {
+      withDatabase ("tmpdb") {
         withTable("t") {
           withTempDir { dir =>
             val loc = new File(dir, specialChars)
@@ -2140,8 +2111,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
             assert(tblloc.listFiles().nonEmpty)
           }
         }
-      } finally {
-        spark.sql("DROP DATABASE IF EXISTS tmpdb")
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 44c0fc70d0..f6d47734d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -237,7 +237,7 @@ private[sql] trait SQLTestUtils
 
     try f(dbName) finally {
       if (spark.catalog.currentDatabase == dbName) {
-        spark.sql(s"USE ${DEFAULT_DATABASE}")
+        spark.sql(s"USE $DEFAULT_DATABASE")
       }
       spark.sql(s"DROP DATABASE $dbName CASCADE")
     }
@@ -251,6 +251,7 @@ private[sql] trait SQLTestUtils
       dbNames.foreach { name =>
         spark.sql(s"DROP DATABASE IF EXISTS $name")
       }
+      spark.sql(s"USE $DEFAULT_DATABASE")
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 16a99321ba..341e03b5e5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.hive.orc.OrcFileOperator
 import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
@@ -50,15 +50,28 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
 
   protected override def generateTable(
       catalog: SessionCatalog,
-      name: TableIdentifier): CatalogTable = {
+      name: TableIdentifier,
+      isDataSource: Boolean): CatalogTable = {
     val storage =
-      CatalogStorageFormat(
-        locationUri = Some(catalog.defaultTablePath(name)),
-        inputFormat = Some("org.apache.hadoop.mapred.SequenceFileInputFormat"),
-        outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"),
-        serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
-        compressed = false,
-        properties = Map("serialization.format" -> "1"))
+      if (isDataSource) {
+        val serde = HiveSerDe.sourceToSerDe("parquet")
+        assert(serde.isDefined, "The default format is not Hive compatible")
+        CatalogStorageFormat(
+          locationUri = Some(catalog.defaultTablePath(name)),
+          inputFormat = serde.get.inputFormat,
+          outputFormat = serde.get.outputFormat,
+          serde = serde.get.serde,
+          compressed = false,
+          properties = Map("serialization.format" -> "1"))
+      } else {
+        CatalogStorageFormat(
+          locationUri = Some(catalog.defaultTablePath(name)),
+          inputFormat = Some("org.apache.hadoop.mapred.SequenceFileInputFormat"),
+          outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat"),
+          serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"),
+          compressed = false,
+          properties = Map("serialization.format" -> "1"))
+      }
     val metadata = new MetadataBuilder()
       .putString("key", "value")
       .build()
@@ -71,7 +84,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
         .add("col2", "string")
         .add("a", "int")
         .add("b", "int"),
-      provider = Some("hive"),
+      provider = if (isDataSource) Some("parquet") else Some("hive"),
       partitionColumnNames = Seq("a", "b"),
       createTime = 0L,
       tracksPartitionsInCatalog = true)
@@ -107,6 +120,46 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
     )
   }
 
+  test("alter table: set location") {
+    testSetLocation(isDatasourceTable = false)
+  }
+
+  test("alter table: set properties") {
+    testSetProperties(isDatasourceTable = false)
+  }
+
+  test("alter table: unset properties") {
+    testUnsetProperties(isDatasourceTable = false)
+  }
+
+  test("alter table: set serde") {
+    testSetSerde(isDatasourceTable = false)
+  }
+
+  test("alter table: set serde partition") {
+    testSetSerdePartition(isDatasourceTable = false)
+  }
+
+  test("alter table: change column") {
+    testChangeColumn(isDatasourceTable = false)
+  }
+
+  test("alter table: rename partition") {
+    testRenamePartitions(isDatasourceTable = false)
+  }
+
+  test("alter table: drop partition") {
+    testDropPartitions(isDatasourceTable = false)
+  }
+
+  test("alter table: add partition") {
+    testAddPartitions(isDatasourceTable = false)
+  }
+
+  test("drop table") {
+    testDropTable(isDatasourceTable = false)
+  }
+
 }
 
 class HiveDDLSuite
-- 
GitLab