-
- Downloads
[SPARK-20460][SQL] Make it more consistent to handle column name duplication
## What changes were proposed in this pull request? This pr made it more consistent to handle column name duplication. In the current master, error handling is different when hitting column name duplication: ``` // json scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil) scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data") scala> spark.read.format("json").schema(schema).load("/tmp/data").show org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#12, a#13.; at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153) scala> spark.read.format("json").load("/tmp/data").show org.apache.spark.sql.AnalysisException: Duplicate column(s) : "a" found, cannot save to JSON format; at org.apache.spark.sql.execution.datasources.json.JsonDataSource.checkConstraints(JsonDataSource.scala:81) at org.apache.spark.sql.execution.datasources.json.JsonDataSource.inferSchema(JsonDataSource.scala:63) at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.inferSchema(JsonFileFormat.scala:57) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:176) // csv scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil) scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data") scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#41, a#42.; at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152) // If `inferSchema` is true, a CSV format is duplicate-safe (See SPARK-16896) scala> spark.read.format("csv").option("header", true).load("/tmp/data").show +---+---+ | a0| a1| +---+---+ | 1| 1| +---+---+ // parquet scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil) scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data") scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show org.apache.spark.sql.AnalysisException: Reference 'a' is ambiguous, could be: a#110, a#111.; at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:287) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:181) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:153) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolve$1.apply(LogicalPlan.scala:152) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) ``` When this patch applied, the results change to; ``` // json scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil) scala> Seq("""{"a":1, "a":1}"""""").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data") scala> spark.read.format("json").schema(schema).load("/tmp/data").show org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a"; at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368) scala> spark.read.format("json").load("/tmp/data").show org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a"; at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:156) // csv scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil) scala> Seq("a,a", "1,1").toDF().coalesce(1).write.mode("overwrite").text("/tmp/data") scala> spark.read.format("csv").schema(schema).option("header", false).load("/tmp/data").show org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a"; at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) scala> spark.read.format("csv").option("header", true).load("/tmp/data").show +---+---+ | a0| a1| +---+---+ | 1| 1| +---+---+ // parquet scala> val schema = StructType(StructField("a", IntegerType) :: StructField("a", IntegerType) :: Nil) scala> Seq((1, 1)).toDF("a", "b").coalesce(1).write.mode("overwrite").parquet("/tmp/data") scala> spark.read.format("parquet").schema(schema).option("header", false).load("/tmp/data").show org.apache.spark.sql.AnalysisException: Found duplicate column(s) in datasource: "a"; at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtil.scala:47) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtil.scala:33) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:186) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:368) ``` ## How was this patch tested? Added tests in `DataFrameReaderWriterSuite` and `SQLQueryTestSuite`. Author: Takeshi Yamamuro <yamamuro@apache.org> Closes #17758 from maropu/SPARK-20460.
Showing
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala 1 addition, 15 deletions...rg/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
- sql/catalyst/src/main/scala/org/apache/spark/sql/util/SchemaUtils.scala 47 additions, 11 deletions...rc/main/scala/org/apache/spark/sql/util/SchemaUtils.scala
- sql/catalyst/src/test/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala 83 additions, 0 deletions...st/scala/org/apache/spark/sql/util/SchemaUtilsSuite.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala 0 additions, 2 deletions.../spark/sql/execution/command/createDataSourceTables.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 6 additions, 2 deletions...scala/org/apache/spark/sql/execution/command/tables.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 5 additions, 4 deletions.../scala/org/apache/spark/sql/execution/command/views.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala 37 additions, 6 deletions...g/apache/spark/sql/execution/datasources/DataSource.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala 5 additions, 9 deletions...ution/datasources/InsertIntoHadoopFsRelationCommand.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala 3 additions, 7 deletions...e/spark/sql/execution/datasources/PartitioningUtils.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala 3 additions, 8 deletions...ache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala 1 addition, 14 deletions...spark/sql/execution/datasources/json/JsonDataSource.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala 18 additions, 18 deletions...la/org/apache/spark/sql/execution/datasources/rules.scala
- sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 2 additions, 2 deletions.../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 40 additions, 16 deletions...ala/org/apache/spark/sql/execution/command/DDLSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala 2 additions, 2 deletions...test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala 3 additions, 2 deletions...rg/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala 37 additions, 0 deletions.../org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala 88 additions, 0 deletions...rg/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
- sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 1 addition, 1 deletion...la/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
Loading
Please register or sign in to comment