Skip to content
Snippets Groups Projects
Commit 93b8ad18 authored by gatorsmile's avatar gatorsmile
Browse files

[SPARK-17693][SQL] Fixed Insert Failure To Data Source Tables when the Schema has the Comment Field

### What changes were proposed in this pull request?
```SQL
CREATE TABLE tab1(col1 int COMMENT 'a', col2 int) USING parquet
INSERT INTO TABLE tab1 SELECT 1, 2
```
The insert attempt will fail if the target table has a column with comments. The error is strange to the external users:
```
assertion failed: No plan for InsertIntoTable Relation[col1#15,col2#16] parquet, false, false
+- Project [1 AS col1#19, 2 AS col2#20]
   +- OneRowRelation$
```

This PR is to fix the above bug by checking the metadata when comparing the schema between the table and the query. If not matched, we also copy the metadata. This is an alternative to https://github.com/apache/spark/pull/15266

### How was this patch tested?
Added a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #15615 from gatorsmile/insertDataSourceTableWithCommentSolution2.
parent 12b3e8d2
No related branches found
No related tags found
No related merge requests found
......@@ -248,10 +248,16 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] {
expectedOutput: Seq[Attribute]): InsertIntoTable = {
val newChildOutput = expectedOutput.zip(insert.child.output).map {
case (expected, actual) =>
if (expected.dataType.sameType(actual.dataType) && expected.name == actual.name) {
if (expected.dataType.sameType(actual.dataType) &&
expected.name == actual.name &&
expected.metadata == actual.metadata) {
actual
} else {
Alias(Cast(actual, expected.dataType), expected.name)()
// Renaming is needed for handling the following cases like
// 1) Column names/types do not match, e.g., INSERT INTO TABLE tab1 SELECT 1, 2
// 2) Target tables have column metadata
Alias(Cast(actual, expected.dataType), expected.name)(
explicitMetadata = Option(expected.metadata))
}
}
......
......@@ -185,6 +185,48 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
)
}
test("INSERT INTO TABLE with Comment in columns") {
val tabName = "tab1"
withTable(tabName) {
sql(
s"""
|CREATE TABLE $tabName(col1 int COMMENT 'a', col2 int)
|USING parquet
""".stripMargin)
sql(s"INSERT INTO TABLE $tabName SELECT 1, 2")
checkAnswer(
sql(s"SELECT col1, col2 FROM $tabName"),
Row(1, 2) :: Nil
)
}
}
test("INSERT INTO TABLE - complex type but different names") {
val tab1 = "tab1"
val tab2 = "tab2"
withTable(tab1, tab2) {
sql(
s"""
|CREATE TABLE $tab1 (s struct<a: string, b: string>)
|USING parquet
""".stripMargin)
sql(s"INSERT INTO TABLE $tab1 SELECT named_struct('col1','1','col2','2')")
sql(
s"""
|CREATE TABLE $tab2 (p struct<c: string, d: string>)
|USING parquet
""".stripMargin)
sql(s"INSERT INTO TABLE $tab2 SELECT * FROM $tab1")
checkAnswer(
spark.table(tab1),
spark.table(tab2)
)
}
}
test("it is not allowed to write to a table while querying it.") {
val message = intercept[AnalysisException] {
sql(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment