Skip to content
Snippets Groups Projects
Commit 4f8ceed5 authored by hyukjinkwon's avatar hyukjinkwon Committed by Reynold Xin
Browse files

[SPARK-16371][SQL] Do not push down filters incorrectly when inner name and...

[SPARK-16371][SQL] Do not push down filters incorrectly when inner name and outer name are the same in Parquet

## What changes were proposed in this pull request?

Currently, if there is a schema as below:

```
root
  |-- _1: struct (nullable = true)
  |    |-- _1: integer (nullable = true)
```

and if we execute the codes below:

```scala
df.filter("_1 IS NOT NULL").count()
```

This pushes down a filter although this filter is being applied to `StructType`.(If my understanding is correct, Spark does not pushes down filters for those).

The reason is, `ParquetFilters.getFieldMap` produces results below:

```
(_1,StructType(StructField(_1,IntegerType,true)))
(_1,IntegerType)
```

and then it becomes a `Map`

```
(_1,IntegerType)
```

Now, because of ` ....lift(dataTypeOf(name)).map(_(name, value))`, this pushes down filters for `_1` which Parquet thinks is `IntegerType`. However, it is actually `StructType`.

So, Parquet filter2 produces incorrect results, for example, the codes below:

```
df.filter("_1 IS NOT NULL").count()
```

produces always 0.

This PR prevents this by not finding nested fields.

## How was this patch tested?

Unit test in `ParquetFilterSuite`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #14067 from HyukjinKwon/SPARK-16371.
parent 480357cc
No related branches found
No related tags found
No related merge requests found
......@@ -438,7 +438,7 @@ private[sql] class ParquetOutputWriterFactory(
ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport])
// We want to clear this temporary metadata from saving into Parquet file.
// This metadata is only useful for detecting optional columns when pushdowning filters.
// This metadata is only useful for detecting optional columns when pushing down filters.
val dataSchemaToWrite = StructType.removeMetadata(
StructType.metadataKeyForOptionalField,
dataSchema).asInstanceOf[StructType]
......
......@@ -185,10 +185,13 @@ private[sql] object ParquetFilters {
*/
private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match {
case StructType(fields) =>
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
fields.filter { f =>
!f.metadata.contains(StructType.metadataKeyForOptionalField) ||
!f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
}.map(f => f.name -> f.dataType) ++ fields.flatMap { f => getFieldMap(f.dataType) }
}.map(f => f.name -> f.dataType)
case _ => Array.empty[(String, DataType)]
}
......
......@@ -543,4 +543,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}
test("Do not push down filters incorrectly when inner name and outer name are the same") {
withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df =>
// Here the schema becomes as below:
//
// root
// |-- _1: struct (nullable = true)
// | |-- _1: integer (nullable = true)
//
// The inner column name, `_1` and outer column name `_1` are the same.
// Obviously this should not push down filters because the outer column is struct.
assert(df.filter("_1 IS NOT NULL").count() === 4)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment