From c8c4441dfdfeda22f8d92e25aee1b6a6269752f9 Mon Sep 17 00:00:00 2001
From: Ryan Blue <blue@apache.org>
Date: Wed, 21 Feb 2018 15:10:08 +0800
Subject: [PATCH] [SPARK-23418][SQL] Fail DataSourceV2 reads when user schema
 is passed, but not supported.

## What changes were proposed in this pull request?

DataSourceV2 initially allowed user-supplied schemas when a source doesn't implement `ReadSupportWithSchema`, as long as the schema was identical to the source's schema. This is confusing behavior because changes to an underlying table can cause a previously working job to fail with an exception that user-supplied schemas are not allowed.

This reverts commit adcb25a0624, which was added to #20387 so that it could be removed in a separate JIRA issue and PR.

## How was this patch tested?

Existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #20603 from rdblue/SPARK-23418-revert-adcb25a0624.
---
 .../datasources/v2/DataSourceV2Relation.scala       | 13 +------------
 1 file changed, 1 insertion(+), 12 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index a98dd4866f..cc6cb631e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -174,13 +174,6 @@ object DataSourceV2Relation {
       v2Options: DataSourceOptions,
       userSchema: Option[StructType]): StructType = {
     val reader = userSchema match {
-      // TODO: remove this case because it is confusing for users
-      case Some(s) if !source.isInstanceOf[ReadSupportWithSchema] =>
-        val reader = source.asReadSupport.createReader(v2Options)
-        if (reader.readSchema() != s) {
-          throw new AnalysisException(s"${source.name} does not allow user-specified schemas.")
-        }
-        reader
       case Some(s) =>
         source.asReadSupportWithSchema.createReader(s, v2Options)
       case _ =>
@@ -195,11 +188,7 @@ object DataSourceV2Relation {
       filters: Option[Seq[Expression]] = None,
       userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
     val projection = schema(source, makeV2Options(options), userSpecifiedSchema).toAttributes
-    DataSourceV2Relation(source, options, projection, filters,
-      // if the source does not implement ReadSupportWithSchema, then the userSpecifiedSchema must
-      // be equal to the reader's schema. the schema method enforces this. because the user schema
-      // and the reader's schema are identical, drop the user schema.
-      if (source.isInstanceOf[ReadSupportWithSchema]) userSpecifiedSchema else None)
+    DataSourceV2Relation(source, options, projection, filters, userSpecifiedSchema)
   }
 
   private def pushRequiredColumns(reader: DataSourceReader, struct: StructType): Unit = {
-- 
GitLab