diff --git a/sql/core/src/test/resources/sql-tests/inputs/blacklist.sql b/sql/core/src/test/resources/sql-tests/inputs/blacklist.sql
new file mode 100644
index 0000000000000000000000000000000000000000..d69f8147a526473f0914cb9521c477c758ac167d
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/blacklist.sql
@@ -0,0 +1,4 @@
+-- This is a query file that has been blacklisted.
+-- It includes a query that should crash Spark.
+-- If the test case is run, the whole suite would fail.
+some random not working query that should crash Spark.
diff --git a/sql/core/src/test/resources/sql-tests/inputs/number-format.sql b/sql/core/src/test/resources/sql-tests/inputs/number-format.sql
new file mode 100644
index 0000000000000000000000000000000000000000..60076a84315826daa36f6b2768ecd926a7ee6aec
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/number-format.sql
@@ -0,0 +1,13 @@
+-- Verifies how we parse numbers
+
+-- parse as ints
+select 1, -1;
+
+-- parse as longs
+select 2147483648, -2147483649;
+
+-- parse as decimals
+select 9223372036854775808, -9223372036854775809;
+
+-- various floating point (decimal) formats
+select 0.3, -0.8, .5, -.18, 0.1111;
diff --git a/sql/core/src/test/resources/sql-tests/results/number-format.sql.out b/sql/core/src/test/resources/sql-tests/results/number-format.sql.out
new file mode 100644
index 0000000000000000000000000000000000000000..4b800b7d92560c116eb286b9172a3a0f68acfdae
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/number-format.sql.out
@@ -0,0 +1,34 @@
+-- Automatically generated by org.apache.spark.sql.SQLQueryTestSuite
+-- Number of queries: 4
+
+
+-- !query 0
+select 1, -1
+-- !query 0 schema
+struct<1:int,(-1):int>
+-- !query 0 output
+1	-1
+
+
+-- !query 1
+select 2147483648, -2147483649
+-- !query 1 schema
+struct<2147483648:bigint,(-2147483649):bigint>
+-- !query 1 output
+2147483648	-2147483649
+
+
+-- !query 2
+select 9223372036854775808, -9223372036854775809
+-- !query 2 schema
+struct<9223372036854775808:decimal(19,0),(-9223372036854775809):decimal(19,0)>
+-- !query 2 output
+9223372036854775808	-9223372036854775809
+
+
+-- !query 3
+select 0.3, -0.8, .5, -.18, 0.1111
+-- !query 3 schema
+struct<0.3:decimal(1,1),(-0.8):decimal(1,1),0.5:decimal(1,1),(-0.18):decimal(2,2),0.1111:decimal(4,4)>
+-- !query 3 output
+0.3	-0.8	0.5	-0.18	0.1111
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 4ba324aa8cee77ede04963465251e1971126e748..a0130dd48c2ff26c8a49cfd72b2616cafefa4de6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1368,42 +1368,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     )
   }
 
-  test("Floating point number format") {
-    checkAnswer(
-      sql("SELECT 0.3"), Row(BigDecimal(0.3))
-    )
-
-    checkAnswer(
-      sql("SELECT -0.8"), Row(BigDecimal(-0.8))
-    )
-
-    checkAnswer(
-      sql("SELECT .5"), Row(BigDecimal(0.5))
-    )
-
-    checkAnswer(
-      sql("SELECT -.18"), Row(BigDecimal(-0.18))
-    )
-  }
-
-  test("Auto cast integer type") {
-    checkAnswer(
-      sql(s"SELECT ${Int.MaxValue + 1L}"), Row(Int.MaxValue + 1L)
-    )
-
-    checkAnswer(
-      sql(s"SELECT ${Int.MinValue - 1L}"), Row(Int.MinValue - 1L)
-    )
-
-    checkAnswer(
-      sql("SELECT 9223372036854775808"), Row(new java.math.BigDecimal("9223372036854775808"))
-    )
-
-    checkAnswer(
-      sql("SELECT -9223372036854775809"), Row(new java.math.BigDecimal("-9223372036854775809"))
-    )
-  }
-
   test("Test to check we can apply sign to expression") {
 
     checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..08b8432d68eb35dc9ddf4a4af91dda4a8c1954a3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+import java.util.{Locale, TimeZone}
+
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * End-to-end test cases for SQL queries.
+ *
+ * Each case is loaded from a file in "spark/sql/core/src/test/resources/sql-tests/inputs".
+ * Each case has a golden result file in "spark/sql/core/src/test/resources/sql-tests/results".
+ *
+ * To re-generate golden files, run:
+ * {{{
+ *   SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/test-only *SQLQueryTestSuite"
+ * }}}
+ *
+ * The format for input files is simple:
+ *  1. A list of SQL queries separated by semicolon.
+ *  2. Lines starting with -- are treated as comments and ignored.
+ *
+ * For example:
+ * {{{
+ *   -- this is a comment
+ *   select 1, -1;
+ *   select current_date;
+ * }}}
+ *
+ * The format for golden result files look roughly like:
+ * {{{
+ *   -- some header information
+ *
+ *   -- !query 0
+ *   select 1, -1
+ *   -- !query 0 schema
+ *   struct<...schema...>
+ *   -- !query 0 output
+ *   ... data row 1 ...
+ *   ... data row 2 ...
+ *   ...
+ *
+ *   -- !query 1
+ *   ...
+ * }}}
+ */
+class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
+
+  private val regenerateGoldenFiles: Boolean = System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
+
+  private val baseResourcePath = {
+    // If regenerateGoldenFiles is true, we must be running this in SBT and we use hard-coded
+    // relative path. Otherwise, we use classloader's getResource to find the location.
+    if (regenerateGoldenFiles) {
+      java.nio.file.Paths.get("src", "test", "resources", "sql-tests").toFile
+    } else {
+      val res = getClass.getClassLoader.getResource("sql-tests")
+      new File(res.getFile)
+    }
+  }
+
+  private val inputFilePath = new File(baseResourcePath, "inputs").getAbsolutePath
+  private val goldenFilePath = new File(baseResourcePath, "results").getAbsolutePath
+
+  /** List of test cases to ignore, in lower cases. */
+  private val blackList = Set(
+    "blacklist.sql"  // Do NOT remove this one. It is here to test the blacklist functionality.
+  )
+
+  // Create all the test cases.
+  listTestCases().foreach(createScalaTestCase)
+
+  /** A test case. */
+  private case class TestCase(name: String, inputFile: String, resultFile: String)
+
+  /** A single SQL query's output. */
+  private case class QueryOutput(sql: String, schema: String, output: String) {
+    def toString(queryIndex: Int): String = {
+      // We are explicitly not using multi-line string due to stripMargin removing "|" in output.
+      s"-- !query $queryIndex\n" +
+        sql + "\n" +
+        s"-- !query $queryIndex schema\n" +
+        schema + "\n" +
+         s"-- !query $queryIndex output\n" +
+        output
+    }
+  }
+
+  private def createScalaTestCase(testCase: TestCase): Unit = {
+    if (blackList.contains(testCase.name.toLowerCase)) {
+      // Create a test case to ignore this case.
+      ignore(testCase.name) { /* Do nothing */ }
+    } else {
+      // Create a test case to run this case.
+      test(testCase.name) { runTest(testCase) }
+    }
+  }
+
+  /** Run a test case. */
+  private def runTest(testCase: TestCase): Unit = {
+    val input = fileToString(new File(testCase.inputFile))
+
+    // List of SQL queries to run
+    val queries: Seq[String] = {
+      val cleaned = input.split("\n").filterNot(_.startsWith("--")).mkString("\n")
+      // note: this is not a robust way to split queries using semicolon, but works for now.
+      cleaned.split("(?<=[^\\\\]);").map(_.trim).filter(_ != "").toSeq
+    }
+
+    // Run the SQL queries preparing them for comparison.
+    val outputs: Seq[QueryOutput] = queries.map { sql =>
+      val df = spark.sql(sql)
+      // We might need to do some query canonicalization in the future.
+      QueryOutput(
+        sql = sql,
+        schema = df.schema.catalogString,
+        output = df.queryExecution.hiveResultString().mkString("\n"))
+    }
+
+    if (regenerateGoldenFiles) {
+      // Again, we are explicitly not using multi-line string due to stripMargin removing "|".
+      val goldenOutput = {
+        s"-- Automatically generated by ${getClass.getName}\n" +
+        s"-- Number of queries: ${outputs.size}\n\n\n" +
+        outputs.zipWithIndex.map{case (qr, i) => qr.toString(i)}.mkString("\n\n\n") + "\n"
+      }
+      stringToFile(new File(testCase.resultFile), goldenOutput)
+    }
+
+    // Read back the golden file.
+    val expectedOutputs: Seq[QueryOutput] = {
+      val goldenOutput = fileToString(new File(testCase.resultFile))
+      val segments = goldenOutput.split("-- !query.+\n")
+
+      // each query has 3 segments, plus the header
+      assert(segments.size == outputs.size * 3 + 1,
+        s"Expected ${outputs.size * 3 + 1} blocks in result file but got ${segments.size}. " +
+        s"Try regenerate the result files.")
+      Seq.tabulate(outputs.size) { i =>
+        QueryOutput(
+          sql = segments(i * 3 + 1).trim,
+          schema = segments(i * 3 + 2).trim,
+          output = segments(i * 3 + 3).trim
+        )
+      }
+    }
+
+    // Compare results.
+    assertResult(expectedOutputs.size, s"Number of queries should be ${expectedOutputs.size}") {
+      outputs.size
+    }
+
+    outputs.zip(expectedOutputs).zipWithIndex.foreach { case ((output, expected), i) =>
+      assertResult(expected.sql, s"SQL query should match for query #$i") { output.sql }
+      assertResult(expected.schema, s"Schema should match for query #$i") { output.schema }
+      assertResult(expected.output, s"Result should match for query #$i") { output.output }
+    }
+  }
+
+  private def listTestCases(): Seq[TestCase] = {
+    listFilesRecursively(new File(inputFilePath)).map { file =>
+      val resultFile = file.getAbsolutePath.replace(inputFilePath, goldenFilePath) + ".out"
+      TestCase(file.getName, file.getAbsolutePath, resultFile)
+    }
+  }
+
+  /** Returns all the files (not directories) in a directory, recursively. */
+  private def listFilesRecursively(path: File): Seq[File] = {
+    val (dirs, files) = path.listFiles().partition(_.isDirectory)
+    files ++ dirs.flatMap(listFilesRecursively)
+  }
+
+  private val originalTimeZone = TimeZone.getDefault
+  private val originalLocale = Locale.getDefault
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    // Timezone is fixed to America/Los_Angeles for those timezone sensitive tests (timestamp_*)
+    TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
+    // Add Locale setting
+    Locale.setDefault(Locale.US)
+    RuleExecutor.resetTime()
+  }
+
+  override def afterAll(): Unit = {
+    try {
+      TimeZone.setDefault(originalTimeZone)
+      Locale.setDefault(originalLocale)
+
+      // For debugging dump some statistics about how much time was spent in various optimizer rules
+      logWarning(RuleExecutor.dumpTimeSpent())
+    } finally {
+      super.afterAll()
+    }
+  }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
index d8ab864ca6fce648377c0b8b99e3fbb9ead3f300..4e5a51155defde26b0f2613108ddea6047453efd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
@@ -41,8 +41,7 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   import testImplicits._
 
   // Used for generating new query answer files by saving
-  private val regenerateGoldenFiles: Boolean =
-    Option(System.getenv("SPARK_GENERATE_GOLDEN_FILES")) == Some("1")
+  private val regenerateGoldenFiles: Boolean = System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1"
   private val goldenSQLPath = "src/test/resources/sqlgen/"
 
   protected override def beforeAll(): Unit = {