From 7374e518e2641fddfe57003340db410224b37581 Mon Sep 17 00:00:00 2001
From: gatorsmile <gatorsmile@gmail.com>
Date: Sat, 9 Jul 2016 20:35:45 +0800
Subject: [PATCH] [SPARK-16401][SQL] Data Source API: Enable Extending
 RelationProvider and CreatableRelationProvider without Extending
 SchemaRelationProvider

#### What changes were proposed in this pull request?
When users try to implement a data source API with extending only `RelationProvider` and `CreatableRelationProvider`, they will hit an error when resolving the relation.
```Scala
spark.read
.format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
  .load()
  .write.
format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
  .save()
```

The error they hit is like
```
org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema does not allow user-specified schemas.;
org.apache.spark.sql.AnalysisException: org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema does not allow user-specified schemas.;
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:319)
	at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:494)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
```

Actually, the bug fix is simple. [`DataSource.createRelation(sparkSession.sqlContext, mode, options, data)`](https://github.com/gatorsmile/spark/blob/dd644f8117e889cebd6caca58702a7c7e3d88bef/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L429) already returns a BaseRelation. We should not assign schema to `userSpecifiedSchema`. That schema assignment only makes sense for the data sources that extend `FileFormat`.

#### How was this patch tested?
Added a test case.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #14075 from gatorsmile/dataSource.
---
 .../execution/datasources/DataSource.scala    |  5 ++-
 .../sql/test/DataFrameReaderWriterSuite.scala | 32 +++++++++++++++++++
 2 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 6dc27c1952..f572b93991 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -485,12 +485,11 @@ case class DataSource(
             data.logicalPlan,
             mode)
         sparkSession.sessionState.executePlan(plan).toRdd
+        // Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
+        copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
 
       case _ =>
         sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
     }
-
-    // We replace the schema with that of the DataFrame we just wrote out to avoid re-inferring it.
-    copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index d454100ccb..05935cec4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -82,6 +82,29 @@ class DefaultSource
   }
 }
 
+/** Dummy provider with only RelationProvider and CreatableRelationProvider. */
+class DefaultSourceWithoutUserSpecifiedSchema
+  extends RelationProvider
+  with CreatableRelationProvider {
+
+  case class FakeRelation(sqlContext: SQLContext) extends BaseRelation {
+    override def schema: StructType = StructType(Seq(StructField("a", StringType)))
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String]): BaseRelation = {
+    FakeRelation(sqlContext)
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      mode: SaveMode,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    FakeRelation(sqlContext)
+  }
+}
 
 class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with BeforeAndAfter {
 
@@ -120,6 +143,15 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
       .save()
   }
 
+  test("resolve default source without extending SchemaRelationProvider") {
+    spark.read
+      .format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
+      .load()
+      .write
+      .format("org.apache.spark.sql.test.DefaultSourceWithoutUserSpecifiedSchema")
+      .save()
+  }
+
   test("resolve full class") {
     spark.read
       .format("org.apache.spark.sql.test.DefaultSource")
-- 
GitLab