diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index a6a6cef5861f3e0003f99367c9f7867d3daaff0e..763841efbd9f372805a18b16bb8c04ede59f9952 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { - val dataTypeOf = getFieldMap(schema) + val nameToType = getFieldMap(schema) + + // Parquet does not allow dots in the column name because dots are used as a column path + // delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates + // with missing columns. The incorrect results could be got from Parquet when we push down + // filters for the column having dots in the names. Thus, we do not push down such filters. + // See SPARK-20364. + def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".") // NOTE: // @@ -184,30 +191,30 @@ private[parquet] object ParquetFilters { // Probably I missed something and obviously this should be changed. predicate match { - case sources.IsNull(name) if dataTypeOf.contains(name) => - makeEq.lift(dataTypeOf(name)).map(_(name, null)) - case sources.IsNotNull(name) if dataTypeOf.contains(name) => - makeNotEq.lift(dataTypeOf(name)).map(_(name, null)) - - case sources.EqualTo(name, value) if dataTypeOf.contains(name) => - makeEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.Not(sources.EqualTo(name, value)) if dataTypeOf.contains(name) => - makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) - - case sources.EqualNullSafe(name, value) if dataTypeOf.contains(name) => - makeEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.Not(sources.EqualNullSafe(name, value)) if dataTypeOf.contains(name) => - makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) - - case sources.LessThan(name, value) if dataTypeOf.contains(name) => - makeLt.lift(dataTypeOf(name)).map(_(name, value)) - case sources.LessThanOrEqual(name, value) if dataTypeOf.contains(name) => - makeLtEq.lift(dataTypeOf(name)).map(_(name, value)) - - case sources.GreaterThan(name, value) if dataTypeOf.contains(name) => - makeGt.lift(dataTypeOf(name)).map(_(name, value)) - case sources.GreaterThanOrEqual(name, value) if dataTypeOf.contains(name) => - makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) + case sources.IsNull(name) if canMakeFilterOn(name) => + makeEq.lift(nameToType(name)).map(_(name, null)) + case sources.IsNotNull(name) if canMakeFilterOn(name) => + makeNotEq.lift(nameToType(name)).map(_(name, null)) + + case sources.EqualTo(name, value) if canMakeFilterOn(name) => + makeEq.lift(nameToType(name)).map(_(name, value)) + case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name) => + makeNotEq.lift(nameToType(name)).map(_(name, value)) + + case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) => + makeEq.lift(nameToType(name)).map(_(name, value)) + case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name) => + makeNotEq.lift(nameToType(name)).map(_(name, value)) + + case sources.LessThan(name, value) if canMakeFilterOn(name) => + makeLt.lift(nameToType(name)).map(_(name, value)) + case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name) => + makeLtEq.lift(nameToType(name)).map(_(name, value)) + + case sources.GreaterThan(name, value) if canMakeFilterOn(name) => + makeGt.lift(nameToType(name)).map(_(name, value)) + case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name) => + makeGtEq.lift(nameToType(name)).map(_(name, value)) case sources.And(lhs, rhs) => // At here, it is not safe to just convert one side if we do not understand the diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index dd53b561326f39bf2e889ce7141d96424f0c9b9c..98427cfe3031c1dbe4e7ae4a884f920fadcc2a9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -538,6 +538,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // scalastyle:on nonascii } } + + test("SPARK-20364: Disable Parquet predicate pushdown for fields having dots in the names") { + import testImplicits._ + + Seq(true, false).foreach { vectorized => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString, + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString) { + withTempPath { path => + Seq(Some(1), None).toDF("col.dots").write.parquet(path.getAbsolutePath) + val readBack = spark.read.parquet(path.getAbsolutePath).where("`col.dots` IS NOT NULL") + assert(readBack.count() == 1) + } + } + } + } } class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {