diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9bd3975405c389f03de93fdb999a34ca374b5430..9244c5621b2d335fa2b6761d0d88b26560f54e90 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1858,7 +1858,7 @@ test_that("approxQuantile() on a DataFrame", {
 
 test_that("SQL error message is returned from JVM", {
   retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
-  expect_equal(grepl("Table or View not found", retError), TRUE)
+  expect_equal(grepl("Table or view not found", retError), TRUE)
   expect_equal(grepl("blah", retError), TRUE)
 })
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 6e798a53adf2702d439add89b2c55fed67b48f30..179dab11a2b51848bc2d5ba84db2cfaec4d0fa70 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -32,6 +32,8 @@ trait CatalystConf {
   def optimizerInSetConversionThreshold: Int
   def maxCaseBranchesForCodegen: Int
 
+  def runSQLonFile: Boolean
+
   /**
    * Returns the [[Resolver]] for the current configuration, which can be used to determine if two
    * identifiers are equal.
@@ -49,6 +51,6 @@ case class SimpleCatalystConf(
     groupByOrdinal: Boolean = true,
     optimizerMaxIterations: Int = 100,
     optimizerInSetConversionThreshold: Int = 10,
-    maxCaseBranchesForCodegen: Int = 20)
-  extends CatalystConf {
-}
+    maxCaseBranchesForCodegen: Int = 20,
+    runSQLonFile: Boolean = true)
+  extends CatalystConf
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 24558d5b8c55e87093a6a2faf76fba826364ba57..50957e8661b1e7e79ff18a05bbbc68506f91e4ed 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -412,7 +412,7 @@ class Analyzer(
         catalog.lookupRelation(u.tableIdentifier, u.alias)
       } catch {
         case _: NoSuchTableException =>
-          u.failAnalysis(s"Table or View not found: ${u.tableName}")
+          u.failAnalysis(s"Table or view not found: ${u.tableName}")
       }
     }
 
@@ -420,12 +420,18 @@ class Analyzer(
       case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
         i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
       case u: UnresolvedRelation =>
-        try {
+        val table = u.tableIdentifier
+        if (table.database.isDefined && conf.runSQLonFile &&
+            (!catalog.databaseExists(table.database.get) || !catalog.tableExists(table))) {
+          // If the table does not exist, and the database part is specified, and we support
+          // running SQL directly on files, then let's just return the original UnresolvedRelation.
+          // It is possible we are matching a query like "select * from parquet.`/path/to/query`".
+          // The plan will get resolved later.
+          // Note that we are testing (!db_exists || !table_exists) because the catalog throws
+          // an exception from tableExists if the database does not exist.
+          u
+        } else {
           lookupTableFromCatalog(u)
-        } catch {
-          case _: AnalysisException if u.tableIdentifier.database.isDefined =>
-            // delay the exception into CheckAnalysis, then it could be resolved as data source.
-            u
         }
     }
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index a50b9a1e1a9d8a8d4d57dd2d5f8c1ae2cff31426..6b737d6b78221e07df7447013cc46e8088e2a6e7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -52,7 +52,7 @@ trait CheckAnalysis extends PredicateHelper {
       case p if p.analyzed => // Skip already analyzed sub-plans
 
       case u: UnresolvedRelation =>
-        u.failAnalysis(s"Table or View not found: ${u.tableIdentifier}")
+        u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")
 
       case operator: LogicalPlan =>
         operator transformExpressionsUp {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 36f4f29068717722f0dc693cb72523733e9aded4..b8f0e458fa4e48b43b0c24e1ae1c40721f0d8929 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -62,7 +62,7 @@ class InMemoryCatalog extends ExternalCatalog {
   private def requireTableExists(db: String, table: String): Unit = {
     if (!tableExists(db, table)) {
       throw new AnalysisException(
-        s"Table or View not found: '$table' does not exist in database '$db'")
+        s"Table or view not found: '$table' does not exist in database '$db'")
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c1bd51632d8737d6337c54b0a5cf2326383e4286..acd85db7c067c7076a090481ba64835f655ea95d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -642,7 +642,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
 
   def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
 
-  def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)
+  override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
 
   def columnarAggregateMapEnabled: Boolean = getConf(COLUMNAR_AGGREGATE_MAP_ENABLED)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c423b84957c026249297c304fea2de5d61324e92..04ad92074927df3c116d3582114ee3c81991d1c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -98,7 +98,7 @@ private[sql] class SessionState(ctx: SQLContext) {
       override val extendedResolutionRules =
         PreInsertCastAndRename ::
         DataSourceAnalysis ::
-        (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+        (if (conf.runSQLonFile) new ResolveDataSource(ctx) :: Nil else Nil)
 
       override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 84f0c0083b3bf16e328fd5dc241e9eac584f9c7e..29521afdd8e9d18ec59ee2bbf3f1d4b0da225be8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1825,12 +1825,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     val e1 = intercept[AnalysisException] {
       sql("select * from in_valid_table")
     }
-    assert(e1.message.contains("Table or View not found"))
+    assert(e1.message.contains("Table or view not found"))
 
     val e2 = intercept[AnalysisException] {
       sql("select * from no_db.no_table").show()
     }
-    assert(e2.message.contains("Table or View not found"))
+    assert(e2.message.contains("Table or view not found"))
 
     val e3 = intercept[AnalysisException] {
       sql("select * from json.invalid_file")
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index cc8b41542ed689337c5d0c5624bb0d5cb3e090cc..3fa2f884e2c4c7626026f146be1d12fb5c7ae809 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -230,7 +230,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
     runCliWithin(timeout = 2.minute,
       errorResponses = Seq("AnalysisException"))(
       "select * from nonexistent_table;"
-        -> "Error in query: Table or View not found: nonexistent_table;"
+        -> "Error in query: Table or view not found: nonexistent_table;"
     )
   }
 
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index b0877823c008b622345034770387b1bf950d04f9..a22e19207e5c185c7eee64805c426777cc1f7c3a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -91,7 +91,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
         catalog.PreInsertionCasts ::
         PreInsertCastAndRename ::
         DataSourceAnalysis ::
-        (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil)
+        (if (conf.runSQLonFile) new ResolveDataSource(ctx) :: Nil else Nil)
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
     }