diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index a649daf5a1c1343b31c3cd93ef689d4e48791f5e..66fad85ea02639fe9655aecd9d8aea92df6d564d 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -176,6 +176,10 @@
       <groupId>org.apache.thrift</groupId>
       <artifactId>libfb303</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.scala-lang</groupId>
       <artifactId>scala-compiler</artifactId>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
deleted file mode 100644
index 3bd3d0d6db355a6cf8dfe339fca7309c979dfdae..0000000000000000000000000000000000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.net.URI
-
-import org.apache.hadoop.fs.Path
-import org.scalatest.BeforeAndAfterEach
-
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.Utils
-
-
-class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest
-  with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
-
-  val tempDir = Utils.createTempDir().getCanonicalFile
-  val tempDirUri = tempDir.toURI
-  val tempDirStr = tempDir.getAbsolutePath
-
-  override def beforeEach(): Unit = {
-    sql("CREATE DATABASE test_db")
-    for ((tbl, _) <- rawTablesAndExpectations) {
-      hiveClient.createTable(tbl, ignoreIfExists = false)
-    }
-  }
-
-  override def afterEach(): Unit = {
-    Utils.deleteRecursively(tempDir)
-    hiveClient.dropDatabase("test_db", ignoreIfNotExists = false, cascade = true)
-  }
-
-  private def getTableMetadata(tableName: String): CatalogTable = {
-    spark.sharedState.externalCatalog.getTable("test_db", tableName)
-  }
-
-  private def defaultTableURI(tableName: String): URI = {
-    spark.sessionState.catalog.defaultTablePath(TableIdentifier(tableName, Some("test_db")))
-  }
-
-  // Raw table metadata that are dumped from tables created by Spark 2.0. Note that, all spark
-  // versions prior to 2.1 would generate almost same raw table metadata for a specific table.
-  val simpleSchema = new StructType().add("i", "int")
-  val partitionedSchema = new StructType().add("i", "int").add("j", "int")
-
-  lazy val hiveTable = CatalogTable(
-    identifier = TableIdentifier("tbl1", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
-      outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-    schema = simpleSchema)
-
-  lazy val externalHiveTable = CatalogTable(
-    identifier = TableIdentifier("tbl2", Some("test_db")),
-    tableType = CatalogTableType.EXTERNAL,
-    storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(tempDirUri),
-      inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
-      outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-    schema = simpleSchema)
-
-  lazy val partitionedHiveTable = CatalogTable(
-    identifier = TableIdentifier("tbl3", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"),
-      outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
-    schema = partitionedSchema,
-    partitionColumnNames = Seq("j"))
-
-
-  val simpleSchemaJson =
-    """
-      |{
-      | "type": "struct",
-      | "fields": [{
-      |             "name": "i",
-      |             "type": "integer",
-      |             "nullable": true,
-      |             "metadata": {}
-      |            }]
-      |}
-    """.stripMargin
-
-  val partitionedSchemaJson =
-    """
-      |{
-      | "type": "struct",
-      | "fields": [{
-      |             "name": "i",
-      |             "type": "integer",
-      |             "nullable": true,
-      |             "metadata": {}
-      |            },
-      |            {
-      |             "name": "j",
-      |             "type": "integer",
-      |             "nullable": true,
-      |             "metadata": {}
-      |            }]
-      |}
-    """.stripMargin
-
-  lazy val dataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl4", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      properties = Map("path" -> defaultTableURI("tbl4").toString)),
-    schema = new StructType(),
-    provider = Some("json"),
-    properties = Map(
-      "spark.sql.sources.provider" -> "json",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
-  lazy val hiveCompatibleDataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl5", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      properties = Map("path" -> defaultTableURI("tbl5").toString)),
-    schema = simpleSchema,
-    provider = Some("parquet"),
-    properties = Map(
-      "spark.sql.sources.provider" -> "parquet",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
-  lazy val partitionedDataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl6", Some("test_db")),
-    tableType = CatalogTableType.MANAGED,
-    storage = CatalogStorageFormat.empty.copy(
-      properties = Map("path" -> defaultTableURI("tbl6").toString)),
-    schema = new StructType(),
-    provider = Some("json"),
-    properties = Map(
-      "spark.sql.sources.provider" -> "json",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> partitionedSchemaJson,
-      "spark.sql.sources.schema.numPartCols" -> "1",
-      "spark.sql.sources.schema.partCol.0" -> "j"))
-
-  lazy val externalDataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl7", Some("test_db")),
-    tableType = CatalogTableType.EXTERNAL,
-    storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(new URI(defaultTableURI("tbl7") + "-__PLACEHOLDER__")),
-      properties = Map("path" -> tempDirStr)),
-    schema = new StructType(),
-    provider = Some("json"),
-    properties = Map(
-      "spark.sql.sources.provider" -> "json",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
-  lazy val hiveCompatibleExternalDataSourceTable = CatalogTable(
-    identifier = TableIdentifier("tbl8", Some("test_db")),
-    tableType = CatalogTableType.EXTERNAL,
-    storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(tempDirUri),
-      properties = Map("path" -> tempDirStr)),
-    schema = simpleSchema,
-    properties = Map(
-      "spark.sql.sources.provider" -> "parquet",
-      "spark.sql.sources.schema.numParts" -> "1",
-      "spark.sql.sources.schema.part.0" -> simpleSchemaJson))
-
-  lazy val dataSourceTableWithoutSchema = CatalogTable(
-    identifier = TableIdentifier("tbl9", Some("test_db")),
-    tableType = CatalogTableType.EXTERNAL,
-    storage = CatalogStorageFormat.empty.copy(
-      locationUri = Some(new URI(defaultTableURI("tbl9") + "-__PLACEHOLDER__")),
-      properties = Map("path" -> tempDirStr)),
-    schema = new StructType(),
-    provider = Some("json"),
-    properties = Map("spark.sql.sources.provider" -> "json"))
-
-  // A list of all raw tables we want to test, with their expected schema.
-  lazy val rawTablesAndExpectations = Seq(
-    hiveTable -> simpleSchema,
-    externalHiveTable -> simpleSchema,
-    partitionedHiveTable -> partitionedSchema,
-    dataSourceTable -> simpleSchema,
-    hiveCompatibleDataSourceTable -> simpleSchema,
-    partitionedDataSourceTable -> partitionedSchema,
-    externalDataSourceTable -> simpleSchema,
-    hiveCompatibleExternalDataSourceTable -> simpleSchema,
-    dataSourceTableWithoutSchema -> new StructType())
-
-  test("make sure we can read table created by old version of Spark") {
-    for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
-      val readBack = getTableMetadata(tbl.identifier.table)
-      assert(readBack.schema.sameType(expectedSchema))
-
-      if (tbl.tableType == CatalogTableType.EXTERNAL) {
-        // trim the URI prefix
-        val tableLocation = readBack.storage.locationUri.get.getPath
-        val expectedLocation = tempDir.toURI.getPath.stripSuffix("/")
-        assert(tableLocation == expectedLocation)
-      }
-    }
-  }
-
-  test("make sure we can alter table location created by old version of Spark") {
-    withTempDir { dir =>
-      for ((tbl, _) <- rawTablesAndExpectations if tbl.tableType == CatalogTableType.EXTERNAL) {
-        val path = dir.toURI.toString.stripSuffix("/")
-        sql(s"ALTER TABLE ${tbl.identifier} SET LOCATION '$path'")
-
-        val readBack = getTableMetadata(tbl.identifier.table)
-
-        // trim the URI prefix
-        val actualTableLocation = readBack.storage.locationUri.get.getPath
-        val expected = dir.toURI.getPath.stripSuffix("/")
-        assert(actualTableLocation == expected)
-      }
-    }
-  }
-
-  test("make sure we can rename table created by old version of Spark") {
-    for ((tbl, expectedSchema) <- rawTablesAndExpectations) {
-      val newName = tbl.identifier.table + "_renamed"
-      sql(s"ALTER TABLE ${tbl.identifier} RENAME TO $newName")
-
-      val readBack = getTableMetadata(newName)
-      assert(readBack.schema.sameType(expectedSchema))
-
-      // trim the URI prefix
-      val actualTableLocation = readBack.storage.locationUri.get.getPath
-      val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) {
-        tempDir.toURI.getPath.stripSuffix("/")
-      } else {
-        // trim the URI prefix
-        defaultTableURI(newName).getPath
-      }
-      assert(actualTableLocation == expectedLocation)
-    }
-  }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..2928a734a7e366000669be5552b62ef843aec0f2
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.nio.file.Files
+
+import org.apache.spark.TestUtils
+import org.apache.spark.sql.{QueryTest, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+/**
+ * Test HiveExternalCatalog backward compatibility.
+ *
+ * Note that, this test suite will automatically download spark binary packages of different
+ * versions to a local directory `/tmp/spark-test`. If there is already a spark folder with
+ * expected version under this local directory, e.g. `/tmp/spark-test/spark-2.0.3`, we will skip the
+ * downloading for this spark version.
+ */
+class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
+  private val wareHousePath = Utils.createTempDir(namePrefix = "warehouse")
+  private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data")
+  private val sparkTestingDir = "/tmp/spark-test"
+  private val unusedJar = TestUtils.createJarWithClasses(Seq.empty)
+
+  override def afterAll(): Unit = {
+    Utils.deleteRecursively(wareHousePath)
+    Utils.deleteRecursively(tmpDataDir)
+    super.afterAll()
+  }
+
+  private def downloadSpark(version: String): Unit = {
+    import scala.sys.process._
+
+    val url = s"https://d3kbcqa49mib13.cloudfront.net/spark-$version-bin-hadoop2.7.tgz"
+
+    Seq("wget", url, "-q", "-P", sparkTestingDir).!
+
+    val downloaded = new File(sparkTestingDir, s"spark-$version-bin-hadoop2.7.tgz").getCanonicalPath
+    val targetDir = new File(sparkTestingDir, s"spark-$version").getCanonicalPath
+
+    Seq("mkdir", targetDir).!
+
+    Seq("tar", "-xzf", downloaded, "-C", targetDir, "--strip-components=1").!
+
+    Seq("rm", downloaded).!
+  }
+
+  private def genDataDir(name: String): String = {
+    new File(tmpDataDir, name).getCanonicalPath
+  }
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val tempPyFile = File.createTempFile("test", ".py")
+    Files.write(tempPyFile.toPath,
+      s"""
+        |from pyspark.sql import SparkSession
+        |
+        |spark = SparkSession.builder.enableHiveSupport().getOrCreate()
+        |version_index = spark.conf.get("spark.sql.test.version.index", None)
+        |
+        |spark.sql("create table data_source_tbl_{} using json as select 1 i".format(version_index))
+        |
+        |spark.sql("create table hive_compatible_data_source_tbl_" + version_index + \\
+        |          " using parquet as select 1 i")
+        |
+        |json_file = "${genDataDir("json_")}" + str(version_index)
+        |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file)
+        |spark.sql("create table external_data_source_tbl_" + version_index + \\
+        |          "(i int) using json options (path '{}')".format(json_file))
+        |
+        |parquet_file = "${genDataDir("parquet_")}" + str(version_index)
+        |spark.range(1, 2).selectExpr("cast(id as int) as i").write.parquet(parquet_file)
+        |spark.sql("create table hive_compatible_external_data_source_tbl_" + version_index + \\
+        |          "(i int) using parquet options (path '{}')".format(parquet_file))
+        |
+        |json_file2 = "${genDataDir("json2_")}" + str(version_index)
+        |spark.range(1, 2).selectExpr("cast(id as int) as i").write.json(json_file2)
+        |spark.sql("create table external_table_without_schema_" + version_index + \\
+        |          " using json options (path '{}')".format(json_file2))
+        |
+        |spark.sql("create view v_{} as select 1 i".format(version_index))
+      """.stripMargin.getBytes("utf8"))
+
+    PROCESS_TABLES.testingVersions.zipWithIndex.foreach { case (version, index) =>
+      val sparkHome = new File(sparkTestingDir, s"spark-$version")
+      if (!sparkHome.exists()) {
+        downloadSpark(version)
+      }
+
+      val args = Seq(
+        "--name", "prepare testing tables",
+        "--master", "local[2]",
+        "--conf", "spark.ui.enabled=false",
+        "--conf", "spark.master.rest.enabled=false",
+        "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+        "--conf", s"spark.sql.test.version.index=$index",
+        "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
+        tempPyFile.getCanonicalPath)
+      runSparkSubmit(args, Some(sparkHome.getCanonicalPath))
+    }
+
+    tempPyFile.delete()
+  }
+
+  test("backward compatibility") {
+    val args = Seq(
+      "--class", PROCESS_TABLES.getClass.getName.stripSuffix("$"),
+      "--name", "HiveExternalCatalog backward compatibility test",
+      "--master", "local[2]",
+      "--conf", "spark.ui.enabled=false",
+      "--conf", "spark.master.rest.enabled=false",
+      "--conf", s"spark.sql.warehouse.dir=${wareHousePath.getCanonicalPath}",
+      "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
+      unusedJar.toString)
+    runSparkSubmit(args)
+  }
+}
+
+object PROCESS_TABLES extends QueryTest with SQLTestUtils {
+  // Tests the latest version of every release line.
+  val testingVersions = Seq("2.0.2", "2.1.1", "2.2.0")
+
+  protected var spark: SparkSession = _
+
+  def main(args: Array[String]): Unit = {
+    val session = SparkSession.builder()
+      .enableHiveSupport()
+      .getOrCreate()
+    spark = session
+
+    testingVersions.indices.foreach { index =>
+      Seq(
+        s"data_source_tbl_$index",
+        s"hive_compatible_data_source_tbl_$index",
+        s"external_data_source_tbl_$index",
+        s"hive_compatible_external_data_source_tbl_$index",
+        s"external_table_without_schema_$index").foreach { tbl =>
+        val tableMeta = spark.sharedState.externalCatalog.getTable("default", tbl)
+
+        // make sure we can insert and query these tables.
+        session.sql(s"insert into $tbl select 2")
+        checkAnswer(session.sql(s"select * from $tbl"), Row(1) :: Row(2) :: Nil)
+        checkAnswer(session.sql(s"select i from $tbl where i > 1"), Row(2))
+
+        // make sure we can rename table.
+        val newName = tbl + "_renamed"
+        sql(s"ALTER TABLE $tbl RENAME TO $newName")
+        val readBack = spark.sharedState.externalCatalog.getTable("default", newName)
+
+        val actualTableLocation = readBack.storage.locationUri.get.getPath
+        val expectedLocation = if (tableMeta.tableType == CatalogTableType.EXTERNAL) {
+          tableMeta.storage.locationUri.get.getPath
+        } else {
+          spark.sessionState.catalog.defaultTablePath(TableIdentifier(newName, None)).getPath
+        }
+        assert(actualTableLocation == expectedLocation)
+
+        // make sure we can alter table location.
+        withTempDir { dir =>
+          val path = dir.toURI.toString.stripSuffix("/")
+          sql(s"ALTER TABLE ${tbl}_renamed SET LOCATION '$path'")
+          val readBack = spark.sharedState.externalCatalog.getTable("default", tbl + "_renamed")
+          val actualTableLocation = readBack.storage.locationUri.get.getPath
+          val expected = dir.toURI.getPath.stripSuffix("/")
+          assert(actualTableLocation == expected)
+        }
+      }
+
+      // test permanent view
+      checkAnswer(sql(s"select i from v_$index"), Row(1))
+    }
+  }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index be6aa6d8dc3c9a0e3958e468d5d5e995742ee5d9..21b3e281490cffe31933438c2439593c4edaa834 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -18,17 +18,11 @@
 package org.apache.spark.sql.hive
 
 import java.io.{BufferedWriter, File, FileWriter}
-import java.sql.Timestamp
-import java.util.Date
 
-import scala.collection.mutable.ArrayBuffer
 import scala.tools.nsc.Properties
 
 import org.apache.hadoop.fs.Path
 import org.scalatest.{BeforeAndAfterEach, Matchers}
-import org.scalatest.concurrent.TimeLimits
-import org.scalatest.exceptions.TestFailedDueToTimeoutException
-import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
@@ -38,7 +32,6 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.expressions.Window
 import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
-import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
 import org.apache.spark.sql.types.{DecimalType, StructType}
 import org.apache.spark.util.{ResetSystemProperties, Utils}
 
@@ -46,11 +39,10 @@ import org.apache.spark.util.{ResetSystemProperties, Utils}
  * This suite tests spark-submit with applications using HiveContext.
  */
 class HiveSparkSubmitSuite
-  extends SparkFunSuite
+  extends SparkSubmitTestUtils
   with Matchers
   with BeforeAndAfterEach
-  with ResetSystemProperties
-  with TimeLimits {
+  with ResetSystemProperties {
 
   // TODO: rewrite these or mark them as slow tests to be run sparingly
 
@@ -333,71 +325,6 @@ class HiveSparkSubmitSuite
       unusedJar.toString)
     runSparkSubmit(argsForShowTables)
   }
-
-  // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
-  // This is copied from org.apache.spark.deploy.SparkSubmitSuite
-  private def runSparkSubmit(args: Seq[String]): Unit = {
-    val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
-    val history = ArrayBuffer.empty[String]
-    val sparkSubmit = if (Utils.isWindows) {
-      // On Windows, `ProcessBuilder.directory` does not change the current working directory.
-      new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
-    } else {
-      "./bin/spark-submit"
-    }
-    val commands = Seq(sparkSubmit) ++ args
-    val commandLine = commands.mkString("'", "' '", "'")
-
-    val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
-    val env = builder.environment()
-    env.put("SPARK_TESTING", "1")
-    env.put("SPARK_HOME", sparkHome)
-
-    def captureOutput(source: String)(line: String): Unit = {
-      // This test suite has some weird behaviors when executed on Jenkins:
-      //
-      // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins.  Here we add a
-      //    timestamp to provide more diagnosis information.
-      // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print
-      //    them out for debugging purposes.
-      val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line"
-      // scalastyle:off println
-      println(logLine)
-      // scalastyle:on println
-      history += logLine
-    }
-
-    val process = builder.start()
-    new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
-    new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
-
-    try {
-      val exitCode = failAfter(300.seconds) { process.waitFor() }
-      if (exitCode != 0) {
-        // include logs in output. Note that logging is async and may not have completed
-        // at the time this exception is raised
-        Thread.sleep(1000)
-        val historyLog = history.mkString("\n")
-        fail {
-          s"""spark-submit returned with exit code $exitCode.
-             |Command line: $commandLine
-             |
-             |$historyLog
-           """.stripMargin
-        }
-      }
-    } catch {
-      case to: TestFailedDueToTimeoutException =>
-        val historyLog = history.mkString("\n")
-        fail(s"Timeout of $commandLine" +
-            s" See the log4j logs for more detail." +
-            s"\n$historyLog", to)
-        case t: Throwable => throw t
-    } finally {
-      // Ensure we still kill the process in case it timed out
-      process.destroy()
-    }
-  }
 }
 
 object SetMetastoreURLTest extends Logging {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 83cee5d1b8a42e9152e0bd02211d2860f41fbce8..29b0e6c8533ef2f974445a63a9533c8a36d50006 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1354,31 +1354,4 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
     }
   }
-
-  test("SPARK-18464: support old table which doesn't store schema in table properties") {
-    withTable("old") {
-      withTempPath { path =>
-        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
-        val tableDesc = CatalogTable(
-          identifier = TableIdentifier("old", Some("default")),
-          tableType = CatalogTableType.EXTERNAL,
-          storage = CatalogStorageFormat.empty.copy(
-            properties = Map("path" -> path.getAbsolutePath)
-          ),
-          schema = new StructType(),
-          provider = Some("parquet"),
-          properties = Map(
-            HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
-        hiveClient.createTable(tableDesc, ignoreIfExists = false)
-
-        checkAnswer(spark.table("old"), Row(1, "a"))
-        checkAnswer(sql("select * from old"), Row(1, "a"))
-
-        val expectedSchema = StructType(Seq(
-          StructField("i", IntegerType, nullable = true),
-          StructField("j", StringType, nullable = true)))
-        assert(table("old").schema === expectedSchema)
-      }
-    }
-  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ede44df4afe1107a7bd6fca76588958b6a15883c
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+import java.sql.Timestamp
+import java.util.Date
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.concurrent.TimeLimits
+import org.scalatest.exceptions.TestFailedDueToTimeoutException
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
+import org.apache.spark.util.Utils
+
+trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits {
+
+  // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
+  // This is copied from org.apache.spark.deploy.SparkSubmitSuite
+  protected def runSparkSubmit(args: Seq[String], sparkHomeOpt: Option[String] = None): Unit = {
+    val sparkHome = sparkHomeOpt.getOrElse(
+      sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")))
+    val history = ArrayBuffer.empty[String]
+    val sparkSubmit = if (Utils.isWindows) {
+      // On Windows, `ProcessBuilder.directory` does not change the current working directory.
+      new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
+    } else {
+      "./bin/spark-submit"
+    }
+    val commands = Seq(sparkSubmit) ++ args
+    val commandLine = commands.mkString("'", "' '", "'")
+
+    val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
+    val env = builder.environment()
+    env.put("SPARK_TESTING", "1")
+    env.put("SPARK_HOME", sparkHome)
+
+    def captureOutput(source: String)(line: String): Unit = {
+      // This test suite has some weird behaviors when executed on Jenkins:
+      //
+      // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins.  Here we add a
+      //    timestamp to provide more diagnosis information.
+      // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print
+      //    them out for debugging purposes.
+      val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line"
+      // scalastyle:off println
+      println(logLine)
+      // scalastyle:on println
+      history += logLine
+    }
+
+    val process = builder.start()
+    new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
+    new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
+
+    try {
+      val exitCode = failAfter(300.seconds) { process.waitFor() }
+      if (exitCode != 0) {
+        // include logs in output. Note that logging is async and may not have completed
+        // at the time this exception is raised
+        Thread.sleep(1000)
+        val historyLog = history.mkString("\n")
+        fail {
+          s"""spark-submit returned with exit code $exitCode.
+             |Command line: $commandLine
+             |
+             |$historyLog
+           """.stripMargin
+        }
+      }
+    } catch {
+      case to: TestFailedDueToTimeoutException =>
+        val historyLog = history.mkString("\n")
+        fail(s"Timeout of $commandLine" +
+          s" See the log4j logs for more detail." +
+          s"\n$historyLog", to)
+      case t: Throwable => throw t
+    } finally {
+      // Ensure we still kill the process in case it timed out
+      process.destroy()
+    }
+  }
+}