From f8c6bec65784de89b47e96a367d3f9790c1b3115 Mon Sep 17 00:00:00 2001 From: Davies Liu <davies@databricks.com> Date: Wed, 21 Oct 2015 13:38:30 -0700 Subject: [PATCH] [SPARK-11197][SQL] run SQL on files directly This PR introduce a new feature to run SQL directly on files without create a table, for example: ``` select id from json.`path/to/json/files` as j ``` Author: Davies Liu <davies@databricks.com> Closes #9173 from davies/source. --- R/pkg/inst/tests/test_sparkSQL.R | 2 +- .../sql/catalyst/analysis/Analyzer.scala | 10 +++++-- .../sql/catalyst/analysis/CheckAnalysis.scala | 3 ++ .../sql/catalyst/analysis/AnalysisSuite.scala | 2 +- .../scala/org/apache/spark/sql/SQLConf.scala | 8 ++++++ .../org/apache/spark/sql/SQLContext.scala | 2 +- .../sql/execution/datasources/rules.scala | 28 +++++++++++++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 28 +++++++++++++++++++ .../apache/spark/sql/hive/HiveContext.scala | 4 +-- .../sql/hive/execution/SQLQuerySuite.scala | 13 +++++++++ 10 files changed, 91 insertions(+), 9 deletions(-) diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R index e1b42b0804..67d8b23cd7 100644 --- a/R/pkg/inst/tests/test_sparkSQL.R +++ b/R/pkg/inst/tests/test_sparkSQL.R @@ -1428,7 +1428,7 @@ test_that("sampleBy() on a DataFrame", { test_that("SQL error message is returned from JVM", { retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e) - expect_equal(grepl("Table Not Found: blah", retError), TRUE) + expect_equal(grepl("Table not found: blah", retError), TRUE) }) test_that("Method as.data.frame as a synonym for collect()", { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 016dc293f4..beabacfc88 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -256,7 +256,7 @@ class Analyzer( catalog.lookupRelation(u.tableIdentifier, u.alias) } catch { case _: NoSuchTableException => - u.failAnalysis(s"Table Not Found: ${u.tableName}") + u.failAnalysis(s"Table not found: ${u.tableName}") } } @@ -264,7 +264,13 @@ class Analyzer( case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => i.copy(table = EliminateSubQueries(getTable(u))) case u: UnresolvedRelation => - getTable(u) + try { + getTable(u) + } catch { + case _: AnalysisException if u.tableIdentifier.database.isDefined => + // delay the exception into CheckAnalysis, then it could be resolved as data source. + u + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7701fd0451..ab215407f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -49,6 +49,9 @@ trait CheckAnalysis { plan.foreachUp { case p if p.analyzed => // Skip already analyzed sub-plans + case u: UnresolvedRelation => + u.failAnalysis(s"Table not found: ${u.tableIdentifier}") + case operator: LogicalPlan => operator transformExpressionsUp { case a: Attribute if !a.resolved => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 24af8483a7..0a1fa74bed 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -78,7 +78,7 @@ class AnalysisSuite extends AnalysisTest { test("resolve relations") { assertAnalysisError( - UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe")) + UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe")) checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index b08cc8e830..6f2892085a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -432,6 +432,12 @@ private[spark] object SQLConf { val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2", defaultValue = Some(true), doc = "<TODO>") + val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles", + defaultValue = Some(true), + isPublic = false, + doc = "When true, we could use `datasource`.`path` as table in SQL query" + ) + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val EXTERNAL_SORT = "spark.sql.planner.externalSort" @@ -540,6 +546,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS) + private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index e83657a605..a107639947 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -193,7 +193,7 @@ class SQLContext private[sql]( override val extendedResolutionRules = ExtractPythonUDFs :: PreInsertCastAndRename :: - Nil + (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) override val extendedCheckRules = Seq( datasources.PreWriteCheck(catalog) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index b00e5680fe..abc016bf02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -17,13 +17,37 @@ package org.apache.spark.sql.execution.datasources -import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries} +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation} +import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} + +/** + * Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]]. + */ +private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case u: UnresolvedRelation if u.tableIdentifier.database.isDefined => + try { + val resolved = ResolvedDataSource( + sqlContext, + userSpecifiedSchema = None, + partitionColumns = Array(), + provider = u.tableIdentifier.database.get, + options = Map("path" -> u.tableIdentifier.table)) + val plan = LogicalRelation(resolved.relation) + u.alias.map(a => Subquery(u.alias.get, plan)).getOrElse(plan) + } catch { + case e: ClassNotFoundException => u + case e: Exception => + // the provider is valid, but failed to create a logical plan + u.failAnalysis(e.getMessage) + } + } +} /** * A rule to do pre-insert data type casting and field renaming. Before we insert into diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index a35a7f41dd..298c322906 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1796,6 +1796,34 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } + test("run sql directly on files") { + val df = sqlContext.range(100) + withTempPath(f => { + df.write.json(f.getCanonicalPath) + checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"), + df) + }) + + val e1 = intercept[AnalysisException] { + sql("select * from in_valid_table") + } + assert(e1.message.contains("Table not found")) + + val e2 = intercept[AnalysisException] { + sql("select * from no_db.no_table") + } + assert(e2.message.contains("Table not found")) + + val e3 = intercept[AnalysisException] { + sql("select * from json.invalid_file") + } + assert(e3.message.contains("No input paths specified")) + } + test("SortMergeJoin returns wrong results when using UnsafeRows") { // This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737. // This bug will be triggered when Tungsten is enabled and there are multiple diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 38c195bc7d..61f611638f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser} -import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck} +import org.apache.spark.sql.execution.datasources.{ResolveDataSource, DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck} import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand} import org.apache.spark.sql.hive.client._ @@ -473,7 +473,7 @@ class HiveContext private[hive]( ExtractPythonUDFs :: ResolveHiveWindowFunction :: PreInsertCastAndRename :: - Nil + (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil) override val extendedCheckRules = Seq( PreWriteCheck(catalog) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index c929ba5068..396150be76 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1281,6 +1281,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("run sql directly on files") { + val df = sqlContext.range(100) + withTempPath(f => { + df.write.parquet(f.getCanonicalPath) + checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select a.id from parquet.`${f.getCanonicalPath}` as a"), + df) + }) + } + test("correctly parse CREATE VIEW statement") { withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { withTable("jt") { -- GitLab