Skip to content
Snippets Groups Projects
Commit d8ef0be8 authored by Takeshi YAMAMURO's avatar Takeshi YAMAMURO Committed by Wenchen Fan
Browse files

[SPARK-18108][SQL] Fix a schema inconsistent bug that makes a parquet reader fail to read data


## What changes were proposed in this pull request?
A vectorized parquet reader fails to read column data if data schema and partition schema overlap with each other and inferred types in the partition schema differ from ones in the data schema. An example code to reproduce this bug is as follows;

```
scala> case class A(a: Long, b: Int)
scala> val as = Seq(A(1, 2))
scala> spark.createDataFrame(as).write.parquet("/data/a=1/")
scala> val df = spark.read.parquet("/data/")
scala> df.printSchema
root
 |-- a: long (nullable = true)
 |-- b: integer (nullable = true)
scala> df.collect
java.lang.NullPointerException
        at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:283)
        at org.apache.spark.sql.execution.vectorized.ColumnarBatch$Row.getLong(ColumnarBatch.java:191)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
```
The root cause is that a logical layer (`HadoopFsRelation`) and a physical layer (`VectorizedParquetRecordReader`) have a different assumption on partition schema; the logical layer trusts the data schema to infer the type the overlapped partition columns, and, on the other hand, the physical layer trusts partition schema which is inferred from path string. To fix this bug, this pr simply updates `HadoopFsRelation.schema` to respect the partition columns position in data schema and respect the partition columns type in partition schema.

## How was this patch tested?
Add tests in `ParquetPartitionDiscoverySuite`

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #16030 from maropu/SPARK-18108.

(cherry picked from commit dc2a4d4a)
Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
parent a73201da
No related branches found
No related tags found
No related merge requests found
......@@ -17,11 +17,13 @@
package org.apache.spark.sql.execution.datasources
import scala.collection.mutable
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}
/**
......@@ -49,10 +51,16 @@ case class HadoopFsRelation(
override def sqlContext: SQLContext = sparkSession.sqlContext
val schema: StructType = {
val dataSchemaColumnNames = dataSchema.map(_.name.toLowerCase).toSet
StructType(dataSchema ++ partitionSchema.filterNot { column =>
dataSchemaColumnNames.contains(column.name.toLowerCase)
})
val getColName: (StructField => String) =
if (sparkSession.sessionState.conf.caseSensitiveAnalysis) _.name else _.name.toLowerCase
val overlappedPartCols = mutable.Map.empty[String, StructField]
partitionSchema.foreach { partitionField =>
if (dataSchema.exists(getColName(_) == getColName(partitionField))) {
overlappedPartCols += getColName(partitionField) -> partitionField
}
}
StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
}
def partitionSchemaOption: Option[StructType] =
......
......@@ -969,4 +969,15 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
))
}
}
test("SPARK-18108 Parquet reader fails when data column types conflict with partition ones") {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val df = Seq((1L, 2.0)).toDF("a", "b")
df.write.parquet(s"$path/a=1")
checkAnswer(spark.read.parquet(s"$path"), Seq(Row(1, 2.0)))
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment