diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index bfebfa0c28c52fe4f8931217efaa8c270f08d39c..043be58edc91ba46c0a388bb4a6a73bc6d796733 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -206,17 +206,21 @@ class SQLContext(@transient val sparkContext: SparkContext) * final desired output requires complex expressions to be evaluated or when columns can be * further eliminated out after filtering has been done. * + * The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized + * away by the filter pushdown optimization. + * * The required attributes for both filtering and expression evaluation are passed to the * provided `scanBuilder` function so that it can avoid unnecessary column materialization. */ def pruneFilterProject( projectList: Seq[NamedExpression], filterPredicates: Seq[Expression], + prunePushedDownFilters: Seq[Expression] => Seq[Expression], scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = { val projectSet = projectList.flatMap(_.references).toSet val filterSet = filterPredicates.flatMap(_.references).toSet - val filterCondition = filterPredicates.reduceLeftOption(And) + val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And) // Right now we still use a projection even if the only evaluation is applying an alias // to a column. Since this is a no-op, it could be avoided. However, using this diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 394a59700dbafe109bf40741d409bb777456db47..cfa8bdae58b11ef8f109f40d2dfcabfab65fe939 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -141,14 +141,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) => InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => { - val remainingFilters = + val prunePushedDownFilters = if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) { - filters.filter { - // Note: filters cannot be pushed down to Parquet if they contain more complex - // expressions than simple "Attribute cmp Literal" comparisons. Here we remove - // all filters that have been pushed down. Note that a predicate such as - // "(A AND B) OR C" can result in "A OR C" being pushed down. - filter => + (filters: Seq[Expression]) => { + filters.filter { filter => + // Note: filters cannot be pushed down to Parquet if they contain more complex + // expressions than simple "Attribute cmp Literal" comparisons. Here we remove + // all filters that have been pushed down. Note that a predicate such as + // "(A AND B) OR C" can result in "A OR C" being pushed down. val recordFilter = ParquetFilters.createFilter(filter) if (!recordFilter.isDefined) { // First case: the pushdown did not result in any record filter. @@ -159,13 +159,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // still want to keep "A AND B" in the higher-level filter, not just "B". !ParquetFilters.findExpression(recordFilter.get, filter).isDefined } + } } } else { - filters + identity[Seq[Expression]] _ } pruneFilterProject( projectList, - remainingFilters, + filters, + prunePushedDownFilters, ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 65f4c17aeee3a44390ccc1a5fc8202ef5e9d5887..f9731e82e49245b2e698b934799a721d1cdc693a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -358,5 +358,9 @@ class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll { assert(stringResult(0).getString(2) == "100", "stringvalue incorrect") assert(stringResult(0).getInt(1) === 100) } -} + test("SPARK-1913 regression: columns only referenced by pushed down filters should remain") { + val query = sql(s"SELECT mystring FROM testfiltersource WHERE myint < 10") + assert(query.collect().size === 10) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b2157074a41bf75b974ddcd2fad4995e35e01e31..8b51957162e04f5ad546bae473be41a8f220e802 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -69,6 +69,7 @@ private[hive] trait HiveStrategies { pruneFilterProject( projectList, otherPredicates, + identity[Seq[Expression]], HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil case _ => Nil