Skip to content
Snippets Groups Projects
Commit de7df7de authored by Zhenhua Wang's avatar Zhenhua Wang Committed by Wenchen Fan
Browse files

[SPARK-17625][SQL] set expectedOutputAttributes when converting...

[SPARK-17625][SQL] set expectedOutputAttributes when converting SimpleCatalogRelation to LogicalRelation

## What changes were proposed in this pull request?

We should set expectedOutputAttributes when converting SimpleCatalogRelation to LogicalRelation, otherwise the outputs of LogicalRelation are different from outputs of SimpleCatalogRelation - they have different exprId's.

## How was this patch tested?

add a test case

Author: Zhenhua Wang <wzh_zju@163.com>

Closes #15182 from wzhfy/expectedAttributes.
parent 3a80f92f
No related branches found
No related tags found
No related merge requests found
...@@ -197,7 +197,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { ...@@ -197,7 +197,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
* source information. * source information.
*/ */
class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = { private def readDataSourceTable(
sparkSession: SparkSession,
simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
val table = simpleCatalogRelation.catalogTable
val dataSource = val dataSource =
DataSource( DataSource(
sparkSession, sparkSession,
...@@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] ...@@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
LogicalRelation( LogicalRelation(
dataSource.resolveRelation(), dataSource.resolveRelation(),
expectedOutputAttributes = Some(simpleCatalogRelation.output),
catalogTable = Some(table)) catalogTable = Some(table))
} }
override def apply(plan: LogicalPlan): LogicalPlan = plan transform { override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
if DDLUtils.isDatasourceTable(s.metadata) => if DDLUtils.isDatasourceTable(s.metadata) =>
i.copy(table = readDataSourceTable(sparkSession, s.metadata)) i.copy(table = readDataSourceTable(sparkSession, s))
case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
readDataSourceTable(sparkSession, s.metadata) readDataSourceTable(sparkSession, s)
} }
} }
......
...@@ -26,7 +26,8 @@ import scala.util.Random ...@@ -26,7 +26,8 @@ import scala.util.Random
import org.scalatest.Matchers._ import org.scalatest.Matchers._
import org.apache.spark.SparkException import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, Union}
import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange}
...@@ -1585,4 +1586,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { ...@@ -1585,4 +1586,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val d = sampleDf.withColumn("c", monotonically_increasing_id).select($"c").collect val d = sampleDf.withColumn("c", monotonically_increasing_id).select($"c").collect
assert(d.size == d.distinct.size) assert(d.size == d.distinct.size)
} }
test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") {
val tableName = "tbl"
withTable(tableName) {
spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
val expr = relation.resolve("i")
val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
qe.assertAnalyzed()
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment