Skip to content
Snippets Groups Projects
Commit 1baaf2b9 authored by Cheng Hao's avatar Cheng Hao Committed by Cheng Lian
Browse files

[SPARK-10829] [SQL] Filter combine partition key and attribute doesn't work in DataSource scan

```scala
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
      withTempPath { dir =>
        val path = s"${dir.getCanonicalPath}/part=1"
        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)

        // If the "part = 1" filter gets pushed down, this query will throw an exception since
        // "part" is not a valid column in the actual Parquet file
        checkAnswer(
          sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
          (2 to 3).map(i => Row(i, i.toString, 1)))
      }
    }
```

We expect the result to be:
```
2,1
3,1
```
But got
```
1,1
2,1
3,1
```

Author: Cheng Hao <hao.cheng@intel.com>

Closes #8916 from chenghao-intel/partition_filter.
parent 2b5e31c7
No related branches found
No related tags found
No related merge requests found
...@@ -62,7 +62,22 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { ...@@ -62,7 +62,22 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Scanning partitioned HadoopFsRelation // Scanning partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
if t.partitionSpec.partitionColumns.nonEmpty => if t.partitionSpec.partitionColumns.nonEmpty =>
val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray // We divide the filter expressions into 3 parts
val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
// TODO this is case-sensitive
// Only prunning the partition keys
val partitionFilters =
filters.filter(_.references.map(_.name).toSet.subsetOf(partitionColumnNames))
// Only pushes down predicates that do not reference partition keys.
val pushedFilters =
filters.filter(_.references.map(_.name).toSet.intersect(partitionColumnNames).isEmpty)
// Predicates with both partition keys and attributes
val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
logInfo { logInfo {
val total = t.partitionSpec.partitions.length val total = t.partitionSpec.partitions.length
...@@ -71,21 +86,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { ...@@ -71,21 +86,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
s"Selected $selected partitions out of $total, pruned $percentPruned% partitions." s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
} }
// Only pushes down predicates that do not reference partition columns. val scan = buildPartitionedTableScan(
val pushedFilters = {
val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
filters.filter { f =>
val referencedColumnNames = f.references.map(_.name).toSet
referencedColumnNames.intersect(partitionColumnNames).isEmpty
}
}
buildPartitionedTableScan(
l, l,
projects, projects,
pushedFilters, pushedFilters,
t.partitionSpec.partitionColumns, t.partitionSpec.partitionColumns,
selectedPartitions) :: Nil selectedPartitions)
combineFilters
.reduceLeftOption(expressions.And)
.map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
// Scanning non-partitioned HadoopFsRelation // Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) => case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
......
...@@ -297,4 +297,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex ...@@ -297,4 +297,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
} }
} }
} }
test("SPARK-10829: Filter combine partition key and attribute doesn't work in DataSource scan") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
(2 to 3).map(i => Row(i, i.toString, 1)))
}
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment