Skip to content
Snippets Groups Projects
Commit 3a80f92f authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-17492][SQL] Fix Reading Cataloged Data Sources without Extending SchemaRelationProvider

### What changes were proposed in this pull request?
For data sources without extending `SchemaRelationProvider`, we expect users to not specify schemas when they creating tables. If the schema is input from users, an exception is issued.

Since Spark 2.1, for any data source, to avoid infer the schema every time, we store the schema in the metastore catalog. Thus, when reading a cataloged data source table, the schema could be read from metastore catalog. In this case, we also got an exception. For example,

```Scala
sql(
  s"""
     |CREATE TABLE relationProvierWithSchema
     |USING org.apache.spark.sql.sources.SimpleScanSource
     |OPTIONS (
     |  From '1',
     |  To '10'
     |)
   """.stripMargin)
spark.table(tableName).show()
```
```
org.apache.spark.sql.sources.SimpleScanSource does not allow user-specified schemas.;
```

This PR is to fix the above issue. When building a data source, we introduce a flag `isSchemaFromUsers` to indicate whether the schema is really input from users. If true, we issue an exception. Otherwise, we will call the `createRelation` of `RelationProvider` to generate the `BaseRelation`, in which it contains the actual schema.

### How was this patch tested?
Added a few cases.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15046 from gatorsmile/tempViewCases.
parent cb324f61
No related branches found
No related tags found
No related merge requests found
...@@ -333,8 +333,13 @@ case class DataSource( ...@@ -333,8 +333,13 @@ case class DataSource(
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) => case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.") throw new AnalysisException(s"A schema needs to be specified when using $className.")
case (_: RelationProvider, Some(_)) => case (dataSource: RelationProvider, Some(schema)) =>
throw new AnalysisException(s"$className does not allow user-specified schemas.") val baseRelation =
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
if (baseRelation.schema != schema) {
throw new AnalysisException(s"$className does not allow user-specified schemas.")
}
baseRelation
// We are reading from the results of a streaming query. Load files from the metadata log // We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs. // instead of listing them using HDFS APIs.
......
...@@ -65,6 +65,26 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { ...@@ -65,6 +65,26 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
) )
} }
test("insert into a temp view that does not point to an insertable data source") {
import testImplicits._
withTempView("t1", "t2") {
sql(
"""
|CREATE TEMPORARY VIEW t1
|USING org.apache.spark.sql.sources.SimpleScanSource
|OPTIONS (
| From '1',
| To '10')
""".stripMargin)
sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t2")
val message = intercept[AnalysisException] {
sql("INSERT INTO TABLE t1 SELECT a FROM t2")
}.getMessage
assert(message.contains("does not allow insertion"))
}
}
test("PreInsert casting and renaming") { test("PreInsert casting and renaming") {
sql( sql(
s""" s"""
......
...@@ -348,31 +348,51 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext { ...@@ -348,31 +348,51 @@ class TableScanSuite extends DataSourceTest with SharedSQLContext {
test("exceptions") { test("exceptions") {
// Make sure we do throw correct exception when users use a relation provider that // Make sure we do throw correct exception when users use a relation provider that
// only implements the RelationProvider or the SchemaRelationProvider. // only implements the RelationProvider or the SchemaRelationProvider.
val schemaNotAllowed = intercept[Exception] { Seq("TEMPORARY VIEW", "TABLE").foreach { tableType =>
sql( val schemaNotAllowed = intercept[Exception] {
""" sql(
|CREATE TEMPORARY VIEW relationProvierWithSchema (i int) s"""
|USING org.apache.spark.sql.sources.SimpleScanSource |CREATE $tableType relationProvierWithSchema (i int)
|OPTIONS ( |USING org.apache.spark.sql.sources.SimpleScanSource
| From '1', |OPTIONS (
| To '10' | From '1',
|) | To '10'
""".stripMargin) |)
""".stripMargin)
}
assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas"))
val schemaNeeded = intercept[Exception] {
sql(
s"""
|CREATE $tableType schemaRelationProvierWithoutSchema
|USING org.apache.spark.sql.sources.AllDataTypesScanSource
|OPTIONS (
| From '1',
| To '10'
|)
""".stripMargin)
}
assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
} }
assert(schemaNotAllowed.getMessage.contains("does not allow user-specified schemas")) }
val schemaNeeded = intercept[Exception] { test("read the data source tables that do not extend SchemaRelationProvider") {
sql( Seq("TEMPORARY VIEW", "TABLE").foreach { tableType =>
""" val tableName = "relationProvierWithSchema"
|CREATE TEMPORARY VIEW schemaRelationProvierWithoutSchema withTable (tableName) {
|USING org.apache.spark.sql.sources.AllDataTypesScanSource sql(
|OPTIONS ( s"""
| From '1', |CREATE $tableType $tableName
| To '10' |USING org.apache.spark.sql.sources.SimpleScanSource
|) |OPTIONS (
""".stripMargin) | From '1',
| To '10'
|)
""".stripMargin)
checkAnswer(spark.table(tableName), spark.range(1, 11).toDF())
}
} }
assert(schemaNeeded.getMessage.contains("A schema needs to be specified when using"))
} }
test("SPARK-5196 schema field with comment") { test("SPARK-5196 schema field with comment") {
......
...@@ -293,6 +293,39 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be ...@@ -293,6 +293,39 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
Option(dir).map(spark.read.format("org.apache.spark.sql.test").load) Option(dir).map(spark.read.format("org.apache.spark.sql.test").load)
} }
test("read a data source that does not extend SchemaRelationProvider") {
val dfReader = spark.read
.option("from", "1")
.option("TO", "10")
.format("org.apache.spark.sql.sources.SimpleScanSource")
// when users do not specify the schema
checkAnswer(dfReader.load(), spark.range(1, 11).toDF())
// when users specify the schema
val inputSchema = new StructType().add("s", IntegerType, nullable = false)
val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() }
assert(e.getMessage.contains(
"org.apache.spark.sql.sources.SimpleScanSource does not allow user-specified schemas"))
}
test("read a data source that does not extend RelationProvider") {
val dfReader = spark.read
.option("from", "1")
.option("TO", "10")
.option("option_with_underscores", "someval")
.option("option.with.dots", "someval")
.format("org.apache.spark.sql.sources.AllDataTypesScanSource")
// when users do not specify the schema
val e = intercept[AnalysisException] { dfReader.load() }
assert(e.getMessage.contains("A schema needs to be specified when using"))
// when users specify the schema
val inputSchema = new StructType().add("s", StringType, nullable = false)
assert(dfReader.schema(inputSchema).load().count() == 10)
}
test("text - API and behavior regarding schema") { test("text - API and behavior regarding schema") {
// Writer // Writer
spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir) spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment