Skip to content
Snippets Groups Projects
Commit fd50df41 authored by Kevin Yu's avatar Kevin Yu Committed by Michael Armbrust
Browse files

[SPARK-12231][SQL] create a combineFilters' projection when we call buildPartitionedTableScan

Hello Michael & All:

We have some issues to submit the new codes in the other PR(#10299), so we closed that PR and open this one with the fix.

The reason for the previous failure is that the projection for the scan when there is a filter that is not pushed down (the "left-over" filter) could be different, in elements or ordering, from the original projection.

With this new codes, the approach to solve this problem is:

Insert a new Project if the "left-over" filter is nonempty and (the original projection is not empty and the projection for the scan has more than one elements which could otherwise cause different ordering in projection).

We create 3 test cases to cover the otherwise failure cases.

Author: Kevin Yu <qyu@us.ibm.com>

Closes #10388 from kevinyu98/spark-12231.
parent 8543997f
No related branches found
No related tags found
No related merge requests found
......@@ -77,7 +77,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val pushedFilters = filters.filter(_.references.intersect(partitionColumns).isEmpty)
// Predicates with both partition keys and attributes
val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
val partitionAndNormalColumnFilters =
filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
......@@ -88,16 +89,33 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
}
// need to add projections from "partitionAndNormalColumnAttrs" in if it is not empty
val partitionAndNormalColumnAttrs = AttributeSet(partitionAndNormalColumnFilters)
val partitionAndNormalColumnProjs = if (partitionAndNormalColumnAttrs.isEmpty) {
projects
} else {
(partitionAndNormalColumnAttrs ++ projects).toSeq
}
val scan = buildPartitionedTableScan(
l,
projects,
partitionAndNormalColumnProjs,
pushedFilters,
t.partitionSpec.partitionColumns,
selectedPartitions)
combineFilters
.reduceLeftOption(expressions.And)
.map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
// Add a Projection to guarantee the original projection:
// this is because "partitionAndNormalColumnAttrs" may be different
// from the original "projects", in elements or their ordering
partitionAndNormalColumnFilters.reduceLeftOption(expressions.And).map(cf =>
if (projects.isEmpty || projects == partitionAndNormalColumnProjs) {
// if the original projection is empty, no need for the additional Project either
execution.Filter(cf, scan)
} else {
execution.Project(projects, execution.Filter(cf, scan))
}
).getOrElse(scan) :: Nil
// Scanning non-partitioned HadoopFsRelation
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
......
......@@ -325,6 +325,47 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
test("SPARK-12231: test the filter and empty project in partitioned DataSource scan") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}"
(1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").
write.partitionBy("a").parquet(path)
// The filter "a > 1 or b < 2" will not get pushed down, and the projection is empty,
// this query will throw an exception since the project from combinedFilter expect
// two projection while the
val df1 = sqlContext.read.parquet(dir.getCanonicalPath)
assert(df1.filter("a > 1 or b < 2").count() == 2)
}
}
}
test("SPARK-12231: test the new projection in partitioned DataSource scan") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}"
(1 to 3).map(i => (i, i + 1, i + 2, i + 3)).toDF("a", "b", "c", "d").
write.partitionBy("a").parquet(path)
// test the generate new projection case
// when projects != partitionAndNormalColumnProjs
val df1 = sqlContext.read.parquet(dir.getCanonicalPath)
checkAnswer(
df1.filter("a > 1 or b > 2").orderBy("a").selectExpr("a", "b", "c", "d"),
(2 to 3).map(i => Row(i, i + 1, i + 2, i + 3)))
}
}
}
test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment