diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c8ad5b303491f0042b5a39f7a100a0395c01dcae..63f01c5bb9e3caca5cfd5f16351ef4eb783c73f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -197,7 +197,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { * source information. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = { + private def readDataSourceTable( + sparkSession: SparkSession, + simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = { + val table = simpleCatalogRelation.catalogTable val dataSource = DataSource( sparkSession, @@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] LogicalRelation( dataSource.resolveRelation(), + expectedOutputAttributes = Some(simpleCatalogRelation.output), catalogTable = Some(table)) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) if DDLUtils.isDatasourceTable(s.metadata) => - i.copy(table = readDataSourceTable(sparkSession, s.metadata)) + i.copy(table = readDataSourceTable(sparkSession, s)) case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - readDataSourceTable(sparkSession, s.metadata) + readDataSourceTable(sparkSession, s) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c2d256bdd335b55232b010342d27742721a7f50f..2c60a7dd9209b9a9b4ff58825e6d62ad7a260b3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -26,7 +26,8 @@ import scala.util.Random import org.scalatest.Matchers._ import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, Union} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange} @@ -1585,4 +1586,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val d = sampleDf.withColumn("c", monotonically_increasing_id).select($"c").collect assert(d.size == d.distinct.size) } + + test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") { + val tableName = "tbl" + withTable(tableName) { + spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName) + val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)) + val expr = relation.resolve("i") + val qe = spark.sessionState.executePlan(Project(Seq(expr), relation)) + qe.assertAnalyzed() + } + } }