Skip to content
Snippets Groups Projects
Commit f8c6bec6 authored by Davies Liu's avatar Davies Liu Committed by Michael Armbrust
Browse files

[SPARK-11197][SQL] run SQL on files directly

This PR introduce a new feature to run SQL directly on files without create a table, for example:

```
select id from json.`path/to/json/files` as j
```

Author: Davies Liu <davies@databricks.com>

Closes #9173 from davies/source.
parent 7c74ebca
No related branches found
No related tags found
No related merge requests found
Showing
with 91 additions and 9 deletions
...@@ -1428,7 +1428,7 @@ test_that("sampleBy() on a DataFrame", { ...@@ -1428,7 +1428,7 @@ test_that("sampleBy() on a DataFrame", {
test_that("SQL error message is returned from JVM", { test_that("SQL error message is returned from JVM", {
retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e) retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
expect_equal(grepl("Table Not Found: blah", retError), TRUE) expect_equal(grepl("Table not found: blah", retError), TRUE)
}) })
test_that("Method as.data.frame as a synonym for collect()", { test_that("Method as.data.frame as a synonym for collect()", {
......
...@@ -256,7 +256,7 @@ class Analyzer( ...@@ -256,7 +256,7 @@ class Analyzer(
catalog.lookupRelation(u.tableIdentifier, u.alias) catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch { } catch {
case _: NoSuchTableException => case _: NoSuchTableException =>
u.failAnalysis(s"Table Not Found: ${u.tableName}") u.failAnalysis(s"Table not found: ${u.tableName}")
} }
} }
...@@ -264,7 +264,13 @@ class Analyzer( ...@@ -264,7 +264,13 @@ class Analyzer(
case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) => case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(table = EliminateSubQueries(getTable(u))) i.copy(table = EliminateSubQueries(getTable(u)))
case u: UnresolvedRelation => case u: UnresolvedRelation =>
getTable(u) try {
getTable(u)
} catch {
case _: AnalysisException if u.tableIdentifier.database.isDefined =>
// delay the exception into CheckAnalysis, then it could be resolved as data source.
u
}
} }
} }
......
...@@ -49,6 +49,9 @@ trait CheckAnalysis { ...@@ -49,6 +49,9 @@ trait CheckAnalysis {
plan.foreachUp { plan.foreachUp {
case p if p.analyzed => // Skip already analyzed sub-plans case p if p.analyzed => // Skip already analyzed sub-plans
case u: UnresolvedRelation =>
u.failAnalysis(s"Table not found: ${u.tableIdentifier}")
case operator: LogicalPlan => case operator: LogicalPlan =>
operator transformExpressionsUp { operator transformExpressionsUp {
case a: Attribute if !a.resolved => case a: Attribute if !a.resolved =>
......
...@@ -78,7 +78,7 @@ class AnalysisSuite extends AnalysisTest { ...@@ -78,7 +78,7 @@ class AnalysisSuite extends AnalysisTest {
test("resolve relations") { test("resolve relations") {
assertAnalysisError( assertAnalysisError(
UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe")) UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe"))
checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation) checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
......
...@@ -432,6 +432,12 @@ private[spark] object SQLConf { ...@@ -432,6 +432,12 @@ private[spark] object SQLConf {
val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2", val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2",
defaultValue = Some(true), doc = "<TODO>") defaultValue = Some(true), doc = "<TODO>")
val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
defaultValue = Some(true),
isPublic = false,
doc = "When true, we could use `datasource`.`path` as table in SQL query"
)
object Deprecated { object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort" val EXTERNAL_SORT = "spark.sql.planner.externalSort"
...@@ -540,6 +546,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf { ...@@ -540,6 +546,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS) private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)
/** ********************** SQLConf functionality methods ************ */ /** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */ /** Set Spark SQL configuration properties. */
......
...@@ -193,7 +193,7 @@ class SQLContext private[sql]( ...@@ -193,7 +193,7 @@ class SQLContext private[sql](
override val extendedResolutionRules = override val extendedResolutionRules =
ExtractPythonUDFs :: ExtractPythonUDFs ::
PreInsertCastAndRename :: PreInsertCastAndRename ::
Nil (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
override val extendedCheckRules = Seq( override val extendedCheckRules = Seq(
datasources.PreWriteCheck(catalog) datasources.PreWriteCheck(catalog)
......
...@@ -17,13 +17,37 @@ ...@@ -17,13 +17,37 @@
package org.apache.spark.sql.execution.datasources package org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation} import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
/**
* Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
*/
private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
try {
val resolved = ResolvedDataSource(
sqlContext,
userSpecifiedSchema = None,
partitionColumns = Array(),
provider = u.tableIdentifier.database.get,
options = Map("path" -> u.tableIdentifier.table))
val plan = LogicalRelation(resolved.relation)
u.alias.map(a => Subquery(u.alias.get, plan)).getOrElse(plan)
} catch {
case e: ClassNotFoundException => u
case e: Exception =>
// the provider is valid, but failed to create a logical plan
u.failAnalysis(e.getMessage)
}
}
}
/** /**
* A rule to do pre-insert data type casting and field renaming. Before we insert into * A rule to do pre-insert data type casting and field renaming. Before we insert into
......
...@@ -1796,6 +1796,34 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { ...@@ -1796,6 +1796,34 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
} }
} }
test("run sql directly on files") {
val df = sqlContext.range(100)
withTempPath(f => {
df.write.json(f.getCanonicalPath)
checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"),
df)
})
val e1 = intercept[AnalysisException] {
sql("select * from in_valid_table")
}
assert(e1.message.contains("Table not found"))
val e2 = intercept[AnalysisException] {
sql("select * from no_db.no_table")
}
assert(e2.message.contains("Table not found"))
val e3 = intercept[AnalysisException] {
sql("select * from json.invalid_file")
}
assert(e3.message.contains("No input paths specified"))
}
test("SortMergeJoin returns wrong results when using UnsafeRows") { test("SortMergeJoin returns wrong results when using UnsafeRows") {
// This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737. // This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737.
// This bug will be triggered when Tungsten is enabled and there are multiple // This bug will be triggered when Tungsten is enabled and there are multiple
......
...@@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback ...@@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression} import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser} import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck} import org.apache.spark.sql.execution.datasources.{ResolveDataSource, DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck}
import org.apache.spark.sql.execution.ui.SQLListener import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand} import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand}
import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.client._
...@@ -473,7 +473,7 @@ class HiveContext private[hive]( ...@@ -473,7 +473,7 @@ class HiveContext private[hive](
ExtractPythonUDFs :: ExtractPythonUDFs ::
ResolveHiveWindowFunction :: ResolveHiveWindowFunction ::
PreInsertCastAndRename :: PreInsertCastAndRename ::
Nil (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
override val extendedCheckRules = Seq( override val extendedCheckRules = Seq(
PreWriteCheck(catalog) PreWriteCheck(catalog)
......
...@@ -1281,6 +1281,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { ...@@ -1281,6 +1281,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
} }
} }
test("run sql directly on files") {
val df = sqlContext.range(100)
withTempPath(f => {
df.write.parquet(f.getCanonicalPath)
checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"),
df)
checkAnswer(sql(s"select a.id from parquet.`${f.getCanonicalPath}` as a"),
df)
})
}
test("correctly parse CREATE VIEW statement") { test("correctly parse CREATE VIEW statement") {
withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") { withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
withTable("jt") { withTable("jt") {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment