diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index 014abd454f5c0f79d61c9d186c3d0b1a3f4a6844..9a08524476baa8765db08df9c2f813b5d2c4fcbf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -17,11 +17,13 @@ package org.apache.spark.sql.execution.datasources +import scala.collection.mutable + import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} /** @@ -49,10 +51,16 @@ case class HadoopFsRelation( override def sqlContext: SQLContext = sparkSession.sqlContext val schema: StructType = { - val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet - StructType(dataSchema ++ partitionSchema.filterNot { column => - dataSchemaColumnNames.contains(column.name.toLowerCase) - }) + val getColName: (StructField => String) = + if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else _.name.toLowerCase + val overlappedPartCols = mutable.Map.empty[String, StructField] + partitionSchema.foreach { partitionField => + if (dataSchema.exists(getColName(_) == getColName(partitionField))) { + overlappedPartCols += getColName(partitionField) -> partitionField + } + } + StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++ + partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f)))) } def partitionSchemaOption: Option[StructType] = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 22e35a1bc0b1d7d49447c67eb1d3adf3c9303c16..f433a74da8cb9490cb0aa760801a2559b0cb9311 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -969,4 +969,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha )) } } + + test("SPARK-18108 Parquet reader fails when data column types conflict with partition ones") { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = Seq((1L, 2.0)).toDF("a", "b") + df.write.parquet(s"$path/a=1") + checkAnswer(spark.read.parquet(s"$path"), Seq(Row(1, 2.0))) + } + } + } }