Skip to content
Snippets Groups Projects
Commit 7374e518 authored by gatorsmile's avatar gatorsmile Committed by Wenchen Fan
Browse files

[SPARK-16401][SQL] Data Source API: Enable Extending RelationProvider and...

[SPARK-16401][SQL] Data Source API: Enable Extending RelationProvider and CreatableRelationProvider without Extending SchemaRelationProvider

#### What changes were proposed in this pull request?
When users try to implement a data source API with extending only `RelationProvider` and `CreatableRelationProvider`, they will hit an error when resolving the relation.
```Scala
spark.read
.format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
  .load()
  .write.
format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
  .save()
```

The error they hit is like
```
org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema does not allow user-specified schemas.;
org.apache.spark.sql.AnalysisException: org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema does not allow user-specified schemas.;
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:319)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:494)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
```

Actually, the bug fix is simple. [`DataSource.createRelation(sparkSession.sqlContext, mode, options, data)`](https://github.com/gatorsmile/spark/blob/dd644f8117e889cebd6caca58702a7c7e3d88bef/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L429) already returns a BaseRelation. We should not assign schema to `userSpecifiedSchema`. That schema assignment only makes sense for the data sources that extend `FileFormat`.

#### How was this patch tested?
Added a test case.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14075 from gatorsmile/dataSource.
parent b1db26ac
No related branches found
No related tags found
No related merge requests found
......@@ -485,12 +485,11 @@ case class DataSource(
data.logicalPlan,
mode)
sparkSession.sessionState.executePlan(plan).toRdd
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
}
// We replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
}
}
......@@ -82,6 +82,29 @@ class DefaultSource
}
}
/** Dummy provider with only RelationProvider and CreatableRelationProvider. */
class DefaultSourceWithoutUserSpecifiedSchema
extends RelationProvider
with CreatableRelationProvider {
case class FakeRelation(sqlContext: SQLContext) extends BaseRelation {
override def schema: StructType = StructType(Seq(StructField("a", StringType)))
}
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
FakeRelation(sqlContext)
}
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
FakeRelation(sqlContext)
}
}
class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter {
......@@ -120,6 +143,15 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
.save()
}
test("resolve default source without extending SchemaRelationProvider") {
spark.read
.format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
.load()
.write
.format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
.save()
}
test("resolve full class") {
spark.read
.format("org.apache.spark.sql.test.DefaultSource")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment