diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index cc80f2e481cbf3ed7d752bb62ea22ebd72c831bf..e93c6549f1538de0dc939de5260f0642ee39f119 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -50,47 +50,53 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
   }
 
   test("insertInto() HiveTable") {
-    sql("CREATE TABLE createAndInsertTest (key int, value string)")
-
-    // Add some data.
-    testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
-
-    // Make sure the table has also been updated.
-    checkAnswer(
-      sql("SELECT * FROM createAndInsertTest"),
-      testData.collect().toSeq
-    )
-
-    // Add more data.
-    testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
-
-    // Make sure the table has been updated.
-    checkAnswer(
-      sql("SELECT * FROM createAndInsertTest"),
-      testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq
-    )
-
-    // Now overwrite.
-    testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
-
-    // Make sure the registered table has also been updated.
-    checkAnswer(
-      sql("SELECT * FROM createAndInsertTest"),
-      testData.collect().toSeq
-    )
+    withTable("createAndInsertTest") {
+      sql("CREATE TABLE createAndInsertTest (key int, value string)")
+
+      // Add some data.
+      testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
+
+      // Make sure the table has also been updated.
+      checkAnswer(
+        sql("SELECT * FROM createAndInsertTest"),
+        testData.collect().toSeq
+      )
+
+      // Add more data.
+      testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
+
+      // Make sure the table has been updated.
+      checkAnswer(
+        sql("SELECT * FROM createAndInsertTest"),
+        testData.toDF().collect().toSeq ++ testData.toDF().collect().toSeq
+      )
+
+      // Now overwrite.
+      testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
+
+      // Make sure the registered table has also been updated.
+      checkAnswer(
+        sql("SELECT * FROM createAndInsertTest"),
+        testData.collect().toSeq
+      )
+    }
   }
 
   test("Double create fails when allowExisting = false") {
-    sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
-
-    intercept[AnalysisException] {
+    withTable("doubleCreateAndInsertTest") {
       sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+
+      intercept[AnalysisException] {
+        sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+      }
     }
   }
 
   test("Double create does not fail when allowExisting = true") {
-    sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
-    sql("CREATE TABLE IF NOT EXISTS doubleCreateAndInsertTest (key int, value string)")
+    withTable("doubleCreateAndInsertTest") {
+      sql("CREATE TABLE doubleCreateAndInsertTest (key int, value string)")
+      sql("CREATE TABLE IF NOT EXISTS doubleCreateAndInsertTest (key int, value string)")
+    }
   }
 
   test("SPARK-4052: scala.collection.Map as value type of MapType") {
@@ -268,29 +274,33 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
 
   test("Test partition mode = strict") {
     withSQLConf(("hive.exec.dynamic.partition.mode", "strict")) {
-      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
-      val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
+      withTable("partitioned") {
+        sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+        val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd"))
           .toDF("id", "data", "part")
 
-      intercept[SparkException] {
-        data.write.insertInto("partitioned")
+        intercept[SparkException] {
+          data.write.insertInto("partitioned")
+        }
       }
     }
   }
 
   test("Detect table partitioning") {
     withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
-      sql("CREATE TABLE source (id bigint, data string, part string)")
-      val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF()
+      withTable("source", "partitioned") {
+        sql("CREATE TABLE source (id bigint, data string, part string)")
+        val data = (1 to 10).map(i => (i, s"data-$i", if ((i % 2) == 0) "even" else "odd")).toDF()
 
-      data.write.insertInto("source")
-      checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
+        data.write.insertInto("source")
+        checkAnswer(sql("SELECT * FROM source"), data.collect().toSeq)
 
-      sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
-      // this will pick up the output partitioning from the table definition
-      spark.table("source").write.insertInto("partitioned")
+        sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")
+        // this will pick up the output partitioning from the table definition
+        spark.table("source").write.insertInto("partitioned")
 
-      checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq)
+        checkAnswer(sql("SELECT * FROM partitioned"), data.collect().toSeq)
+      }
     }
   }
 
@@ -461,19 +471,23 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
 
   testPartitionedTable("insertInto() should reject missing columns") {
     tableName =>
-      sql("CREATE TABLE t (a INT, b INT)")
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, b INT)")
 
-      intercept[AnalysisException] {
-        spark.table("t").write.insertInto(tableName)
+        intercept[AnalysisException] {
+          spark.table("t").write.insertInto(tableName)
+        }
       }
   }
 
   testPartitionedTable("insertInto() should reject extra columns") {
     tableName =>
-      sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, b INT, c INT, d INT, e INT)")
 
-      intercept[AnalysisException] {
-        spark.table("t").write.insertInto(tableName)
+        intercept[AnalysisException] {
+          spark.table("t").write.insertInto(tableName)
+        }
       }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 799abc1d0c42f4abafc71bf5344c499e4d922d5c..2ea51791d0f79346dcd209a2776e7d5d468e7621 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -370,21 +370,23 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
     """.stripMargin)
 
   test("SPARK-7270: consider dynamic partition when comparing table output") {
-    sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)")
-    sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)")
+    withTable("test_partition", "ptest") {
+      sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)")
+      sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)")
 
-    val analyzedPlan = sql(
-      """
+      val analyzedPlan = sql(
+        """
         |INSERT OVERWRITE table test_partition PARTITION (b=1, c)
         |SELECT 'a', 'c' from ptest
       """.stripMargin).queryExecution.analyzed
 
-    assertResult(false, "Incorrect cast detected\n" + analyzedPlan) {
+      assertResult(false, "Incorrect cast detected\n" + analyzedPlan) {
       var hasCast = false
-      analyzedPlan.collect {
-        case p: Project => p.transformExpressionsUp { case c: Cast => hasCast = true; c }
+        analyzedPlan.collect {
+          case p: Project => p.transformExpressionsUp { case c: Cast => hasCast = true; c }
+        }
+        hasCast
       }
-      hasCast
     }
   }
 
@@ -435,13 +437,13 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
 
   test("transform with SerDe2") {
     assume(TestUtils.testCommandAvailable("/bin/bash"))
+    withTable("small_src") {
+      sql("CREATE TABLE small_src(key INT, value STRING)")
+      sql("INSERT OVERWRITE TABLE small_src SELECT key, value FROM src LIMIT 10")
 
-    sql("CREATE TABLE small_src(key INT, value STRING)")
-    sql("INSERT OVERWRITE TABLE small_src SELECT key, value FROM src LIMIT 10")
-
-    val expected = sql("SELECT key FROM small_src").collect().head
-    val res = sql(
-      """
+      val expected = sql("SELECT key FROM small_src").collect().head
+      val res = sql(
+        """
         |SELECT TRANSFORM (key) ROW FORMAT SERDE
         |'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
         |WITH SERDEPROPERTIES ('avro.schema.literal'='{"namespace":
@@ -453,7 +455,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
         |FROM small_src
       """.stripMargin.replaceAll(System.lineSeparator(), " ")).collect().head
 
-    assert(expected(0) === res(0))
+      assert(expected(0) === res(0))
+    }
   }
 
   createQueryTest("transform with SerDe3",
@@ -780,22 +783,26 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
 
   test("Exactly once semantics for DDL and command statements") {
     val tableName = "test_exactly_once"
-    val q0 = sql(s"CREATE TABLE $tableName(key INT, value STRING)")
+    withTable(tableName) {
+      val q0 = sql(s"CREATE TABLE $tableName(key INT, value STRING)")
 
-    // If the table was not created, the following assertion would fail
-    assert(Try(table(tableName)).isSuccess)
+      // If the table was not created, the following assertion would fail
+      assert(Try(table(tableName)).isSuccess)
 
-    // If the CREATE TABLE command got executed again, the following assertion would fail
-    assert(Try(q0.count()).isSuccess)
+      // If the CREATE TABLE command got executed again, the following assertion would fail
+      assert(Try(q0.count()).isSuccess)
+    }
   }
 
   test("SPARK-2263: Insert Map<K, V> values") {
-    sql("CREATE TABLE m(value MAP<INT, STRING>)")
-    sql("INSERT OVERWRITE TABLE m SELECT MAP(key, value) FROM src LIMIT 10")
-    sql("SELECT * FROM m").collect().zip(sql("SELECT * FROM src LIMIT 10").collect()).foreach {
-      case (Row(map: Map[_, _]), Row(key: Int, value: String)) =>
-        assert(map.size === 1)
-        assert(map.head === ((key, value)))
+    withTable("m") {
+      sql("CREATE TABLE m(value MAP<INT, STRING>)")
+      sql("INSERT OVERWRITE TABLE m SELECT MAP(key, value) FROM src LIMIT 10")
+      sql("SELECT * FROM m").collect().zip(sql("SELECT * FROM src LIMIT 10").collect()).foreach {
+        case (Row(map: Map[_, _]), Row(key: Int, value: String)) =>
+          assert(map.size === 1)
+          assert(map.head === ((key, value)))
+      }
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
index ae64cb3210b533ff441f3973f57713a9c0837bc6..3f9bb8de42e09451c7e8de0ac38b919a0cfd4044 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -81,14 +81,16 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
   }
 
   test("Spark-4959 Attributes are case sensitive when using a select query from a projection") {
-    sql("create table spark_4959 (col1 string)")
-    sql("""insert into table spark_4959 select "hi" from src limit 1""")
-    table("spark_4959").select(
-      'col1.as("CaseSensitiveColName"),
-      'col1.as("CaseSensitiveColName2")).createOrReplaceTempView("spark_4959_2")
-
-    assert(sql("select CaseSensitiveColName from spark_4959_2").head() === Row("hi"))
-    assert(sql("select casesensitivecolname from spark_4959_2").head() === Row("hi"))
+    withTable("spark_4959") {
+      sql("create table spark_4959 (col1 string)")
+      sql("""insert into table spark_4959 select "hi" from src limit 1""")
+      table("spark_4959").select(
+        'col1.as("CaseSensitiveColName"),
+        'col1.as("CaseSensitiveColName2")).createOrReplaceTempView("spark_4959_2")
+
+      assert(sql("select CaseSensitiveColName from spark_4959_2").head() === Row("hi"))
+      assert(sql("select casesensitivecolname from spark_4959_2").head() === Row("hi"))
+    }
   }
 
   private def checkNumScannedPartitions(stmt: String, expectedNumParts: Int): Unit = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
index 383d41f907c6d07e1c3dcb9d70f17d8f9a801c02..6198d4963df33e58ea67a29b55f1c6d3e957e981 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala
@@ -74,26 +74,28 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils {
   }
 
   test("hive struct udf") {
-    sql(
-      """
-      |CREATE TABLE hiveUDFTestTable (
-      |   pair STRUCT<id: INT, value: INT>
-      |)
-      |PARTITIONED BY (partition STRING)
-      |ROW FORMAT SERDE '%s'
-      |STORED AS SEQUENCEFILE
-    """.
-        stripMargin.format(classOf[PairSerDe].getName))
-
-    val location = Utils.getSparkClassLoader.getResource("data/files/testUDF").getFile
-    sql(s"""
-      ALTER TABLE hiveUDFTestTable
-      ADD IF NOT EXISTS PARTITION(partition='testUDF')
-      LOCATION '$location'""")
-
-    sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[PairUDF].getName}'")
-    sql("SELECT testUDF(pair) FROM hiveUDFTestTable")
-    sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF")
+    withTable("hiveUDFTestTable") {
+      sql(
+          """
+          |CREATE TABLE hiveUDFTestTable (
+          |   pair STRUCT<id: INT, value: INT>
+          |)
+          |PARTITIONED BY (partition STRING)
+          |ROW FORMAT SERDE '%s'
+          |STORED AS SEQUENCEFILE
+        """.
+            stripMargin.format(classOf[PairSerDe].getName))
+
+      val location = Utils.getSparkClassLoader.getResource("data/files/testUDF").getFile
+      sql(s"""
+        ALTER TABLE hiveUDFTestTable
+        ADD IF NOT EXISTS PARTITION(partition='testUDF')
+        LOCATION '$location'""")
+
+      sql(s"CREATE TEMPORARY FUNCTION testUDF AS '${classOf[PairUDF].getName}'")
+      sql("SELECT testUDF(pair) FROM hiveUDFTestTable")
+      sql("DROP TEMPORARY FUNCTION IF EXISTS testUDF")
+    }
   }
 
   test("Max/Min on named_struct") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 85a6a77cedc4f16044f2ce28f624c226247e7ca6..09c59000b3e3f1a9dec14a8b709589f780e74a4f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -136,49 +136,51 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     orders.toDF.createOrReplaceTempView("orders1")
     orderUpdates.toDF.createOrReplaceTempView("orderupdates1")
 
-    sql(
-      """CREATE TABLE orders(
-        |  id INT,
-        |  make String,
-        |  type String,
-        |  price INT,
-        |  pdate String,
-        |  customer String,
-        |  city String)
-        |PARTITIONED BY (state STRING, month INT)
-        |STORED AS PARQUET
-      """.stripMargin)
+    withTable("orders", "orderupdates") {
+      sql(
+        """CREATE TABLE orders(
+          |  id INT,
+          |  make String,
+          |  type String,
+          |  price INT,
+          |  pdate String,
+          |  customer String,
+          |  city String)
+          |PARTITIONED BY (state STRING, month INT)
+          |STORED AS PARQUET
+        """.stripMargin)
 
-    sql(
-      """CREATE TABLE orderupdates(
-        |  id INT,
-        |  make String,
-        |  type String,
-        |  price INT,
-        |  pdate String,
-        |  customer String,
-        |  city String)
-        |PARTITIONED BY (state STRING, month INT)
-        |STORED AS PARQUET
-      """.stripMargin)
+      sql(
+        """CREATE TABLE orderupdates(
+          |  id INT,
+          |  make String,
+          |  type String,
+          |  price INT,
+          |  pdate String,
+          |  customer String,
+          |  city String)
+          |PARTITIONED BY (state STRING, month INT)
+          |STORED AS PARQUET
+        """.stripMargin)
 
-    sql("set hive.exec.dynamic.partition.mode=nonstrict")
-    sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1")
-    sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1")
+      sql("set hive.exec.dynamic.partition.mode=nonstrict")
+      sql("INSERT INTO TABLE orders PARTITION(state, month) SELECT * FROM orders1")
+      sql("INSERT INTO TABLE orderupdates PARTITION(state, month) SELECT * FROM orderupdates1")
 
-    checkAnswer(
-      sql(
-        """
-          |select orders.state, orders.month
-          |from orders
-          |join (
-          |  select distinct orders.state,orders.month
-          |  from orders
-          |  join orderupdates
-          |    on orderupdates.id = orders.id) ao
-          |  on ao.state = orders.state and ao.month = orders.month
-        """.stripMargin),
-      (1 to 6).map(_ => Row("CA", 20151)))
+      checkAnswer(
+        sql(
+          """
+            |select orders.state, orders.month
+            |from orders
+            |join (
+            |  select distinct orders.state,orders.month
+            |  from orders
+            |  join orderupdates
+            |    on orderupdates.id = orders.id) ao
+            |  on ao.state = orders.state and ao.month = orders.month
+          """.stripMargin),
+        (1 to 6).map(_ => Row("CA", 20151)))
+    }
   }
 
   test("show functions") {
@@ -349,21 +351,23 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   }
 
   test("CTAS with WITH clause") {
+
     val df = Seq((1, 1)).toDF("c1", "c2")
     df.createOrReplaceTempView("table1")
-
-    sql(
-      """
-        |CREATE TABLE with_table1 AS
-        |WITH T AS (
-        |  SELECT *
-        |  FROM table1
-        |)
-        |SELECT *
-        |FROM T
-      """.stripMargin)
-    val query = sql("SELECT * FROM with_table1")
-    checkAnswer(query, Row(1, 1) :: Nil)
+    withTable("with_table1") {
+      sql(
+        """
+          |CREATE TABLE with_table1 AS
+          |WITH T AS (
+          |  SELECT *
+          |  FROM table1
+          |)
+          |SELECT *
+          |FROM T
+        """.stripMargin)
+      val query = sql("SELECT * FROM with_table1")
+      checkAnswer(query, Row(1, 1) :: Nil)
+    }
   }
 
   test("explode nested Field") {
@@ -564,86 +568,90 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   }
 
   test("CTAS with serde") {
-    sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
-    sql(
-      """CREATE TABLE ctas2
-        | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"
-        | WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2")
-        | STORED AS RCFile
-        | TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22")
-        | AS
-        |   SELECT key, value
-        |   FROM src
-        |   ORDER BY key, value""".stripMargin)
-
-    val storageCtas2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("ctas2")).storage
-    assert(storageCtas2.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
-    assert(storageCtas2.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
-    assert(storageCtas2.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
-
-    sql(
-      """CREATE TABLE ctas3
-        | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\012'
-        | STORED AS textfile AS
-        |   SELECT key, value
-        |   FROM src
-        |   ORDER BY key, value""".stripMargin)
-
-    // the table schema may like (key: integer, value: string)
-    sql(
-      """CREATE TABLE IF NOT EXISTS ctas4 AS
-        | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin)
-    // do nothing cause the table ctas4 already existed.
-    sql(
-      """CREATE TABLE IF NOT EXISTS ctas4 AS
-        | SELECT key, value FROM src ORDER BY key, value""".stripMargin)
+    withTable("ctas1", "ctas2", "ctas3", "ctas4", "ctas5") {
+      sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
+      sql(
+        """CREATE TABLE ctas2
+          | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"
+          | WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2")
+          | STORED AS RCFile
+          | TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22")
+          | AS
+          |   SELECT key, value
+          |   FROM src
+          |   ORDER BY key, value""".stripMargin)
+
+      val storageCtas2 = spark.sessionState.catalog.
+        getTableMetadata(TableIdentifier("ctas2")).storage
+      assert(storageCtas2.inputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
+      assert(storageCtas2.outputFormat == Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
+      assert(storageCtas2.serde == Some("org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"))
 
-    checkAnswer(
-      sql("SELECT k, value FROM ctas1 ORDER BY k, value"),
-      sql("SELECT key, value FROM src ORDER BY key, value"))
-    checkAnswer(
-      sql("SELECT key, value FROM ctas2 ORDER BY key, value"),
       sql(
-        """
+        """CREATE TABLE ctas3
+          | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\012'
+          | STORED AS textfile AS
+          |   SELECT key, value
+          |   FROM src
+          |   ORDER BY key, value""".stripMargin)
+
+      // the table schema may like (key: integer, value: string)
+      sql(
+        """CREATE TABLE IF NOT EXISTS ctas4 AS
+          | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin)
+      // do nothing cause the table ctas4 already existed.
+      sql(
+        """CREATE TABLE IF NOT EXISTS ctas4 AS
+          | SELECT key, value FROM src ORDER BY key, value""".stripMargin)
+
+      checkAnswer(
+        sql("SELECT k, value FROM ctas1 ORDER BY k, value"),
+        sql("SELECT key, value FROM src ORDER BY key, value"))
+      checkAnswer(
+        sql("SELECT key, value FROM ctas2 ORDER BY key, value"),
+        sql(
+          """
           SELECT key, value
           FROM src
           ORDER BY key, value"""))
-    checkAnswer(
-      sql("SELECT key, value FROM ctas3 ORDER BY key, value"),
-      sql(
-        """
+      checkAnswer(
+        sql("SELECT key, value FROM ctas3 ORDER BY key, value"),
+        sql(
+          """
           SELECT key, value
           FROM src
           ORDER BY key, value"""))
-    intercept[AnalysisException] {
-      sql(
-        """CREATE TABLE ctas4 AS
-          | SELECT key, value FROM src ORDER BY key, value""".stripMargin)
-    }
-    checkAnswer(
-      sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
-      sql("SELECT key, value FROM ctas4 LIMIT 1").collect().toSeq)
-
-    sql(
-      """CREATE TABLE ctas5
-        | STORED AS parquet AS
-        |   SELECT key, value
-        |   FROM src
-        |   ORDER BY key, value""".stripMargin)
-    val storageCtas5 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("ctas5")).storage
-    assert(storageCtas5.inputFormat ==
-      Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
-    assert(storageCtas5.outputFormat ==
-      Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
-    assert(storageCtas5.serde ==
-      Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
-
-
-    // use the Hive SerDe for parquet tables
-    withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
+      intercept[AnalysisException] {
+        sql(
+          """CREATE TABLE ctas4 AS
+            | SELECT key, value FROM src ORDER BY key, value""".stripMargin)
+      }
       checkAnswer(
-        sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
-        sql("SELECT key, value FROM src ORDER BY key, value"))
+        sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
+        sql("SELECT key, value FROM ctas4 LIMIT 1").collect().toSeq)
+
+      sql(
+        """CREATE TABLE ctas5
+          | STORED AS parquet AS
+          |   SELECT key, value
+          |   FROM src
+          |   ORDER BY key, value""".stripMargin)
+      val storageCtas5 = spark.sessionState.catalog.
+        getTableMetadata(TableIdentifier("ctas5")).storage
+      assert(storageCtas5.inputFormat ==
+        Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
+      assert(storageCtas5.outputFormat ==
+        Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+      assert(storageCtas5.serde ==
+        Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+
+
+      // use the Hive SerDe for parquet tables
+      withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false") {
+        checkAnswer(
+          sql("SELECT key, value FROM ctas5 ORDER BY key, value"),
+          sql("SELECT key, value FROM src ORDER BY key, value"))
+      }
     }
   }
 
@@ -716,40 +724,46 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   }
 
   test("double nested data") {
-    sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil)
-      .toDF().createOrReplaceTempView("nested")
-    checkAnswer(
-      sql("SELECT f1.f2.f3 FROM nested"),
-      Row(1))
+    withTable("test_ctas_1234") {
+      sparkContext.parallelize(Nested1(Nested2(Nested3(1))) :: Nil)
+        .toDF().createOrReplaceTempView("nested")
+      checkAnswer(
+        sql("SELECT f1.f2.f3 FROM nested"),
+        Row(1))
 
-    sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested")
-    checkAnswer(
-      sql("SELECT * FROM test_ctas_1234"),
-      sql("SELECT * FROM nested").collect().toSeq)
+      sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested")
+      checkAnswer(
+        sql("SELECT * FROM test_ctas_1234"),
+        sql("SELECT * FROM nested").collect().toSeq)
 
-    intercept[AnalysisException] {
-      sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect()
+      intercept[AnalysisException] {
+        sql("CREATE TABLE test_ctas_1234 AS SELECT * from notexists").collect()
+      }
     }
   }
 
   test("test CTAS") {
-    sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src")
-    checkAnswer(
-      sql("SELECT key, value FROM test_ctas_123 ORDER BY key"),
-      sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
+    withTable("test_ctas_1234") {
+      sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src")
+      checkAnswer(
+        sql("SELECT key, value FROM test_ctas_123 ORDER BY key"),
+        sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
+    }
   }
 
   test("SPARK-4825 save join to table") {
-    val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
-    sql("CREATE TABLE test1 (key INT, value STRING)")
-    testData.write.mode(SaveMode.Append).insertInto("test1")
-    sql("CREATE TABLE test2 (key INT, value STRING)")
-    testData.write.mode(SaveMode.Append).insertInto("test2")
-    testData.write.mode(SaveMode.Append).insertInto("test2")
-    sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
-    checkAnswer(
-      table("test"),
-      sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq)
+    withTable("test1", "test2", "test") {
+      val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
+      sql("CREATE TABLE test1 (key INT, value STRING)")
+      testData.write.mode(SaveMode.Append).insertInto("test1")
+      sql("CREATE TABLE test2 (key INT, value STRING)")
+      testData.write.mode(SaveMode.Append).insertInto("test2")
+      testData.write.mode(SaveMode.Append).insertInto("test2")
+      sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
+      checkAnswer(
+        table("test"),
+        sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq)
+    }
   }
 
   test("SPARK-3708 Backticks aren't handled correctly is aliases") {
@@ -1843,14 +1857,16 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
 
   test("SPARK-17108: Fix BIGINT and INT comparison failure in spark sql") {
-    sql("create table t1(a map<bigint, array<string>>)")
-    sql("select * from t1 where a[1] is not null")
+    withTable("t1", "t2", "t3") {
+      sql("create table t1(a map<bigint, array<string>>)")
+      sql("select * from t1 where a[1] is not null")
 
-    sql("create table t2(a map<int, array<string>>)")
-    sql("select * from t2 where a[1] is not null")
+      sql("create table t2(a map<int, array<string>>)")
+      sql("select * from t2 where a[1] is not null")
 
-    sql("create table t3(a map<bigint, array<string>>)")
-    sql("select * from t3 where a[1L] is not null")
+      sql("create table t3(a map<bigint, array<string>>)")
+      sql("select * from t3 where a[1L] is not null")
+    }
   }
 
   test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH") {