diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 15c0ac7361168500a7f9bd6dde812767012c8948..0841636d3309ff1ad0cabb71b11babd680da7273 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -332,7 +332,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes. }} - val (unhandledPredicates, pushedFilters) = selectFilters(relation.relation, candidatePredicates) + val (unhandledPredicates, pushedFilters, handledFilters) = + selectFilters(relation.relation, candidatePredicates) // A set of column attributes that are only referenced by pushed down filters. We can eliminate // them from requested columns. @@ -349,8 +350,13 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val metadata: Map[String, String] = { val pairs = ArrayBuffer.empty[(String, String)] + + // Mark filters which are handled by the underlying DataSource with an Astrisk if (pushedFilters.nonEmpty) { - pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) + val markedFilters = for (filter <- pushedFilters) yield { + if (handledFilters.contains(filter)) s"*$filter" else s"$filter" + } + pairs += (PUSHED_FILTERS -> markedFilters.mkString("[", ", ", "]")) } pairs.toMap } @@ -492,13 +498,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { * Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s * and can be handled by `relation`. * - * @return A pair of `Seq[Expression]` and `Seq[Filter]`. The first element contains all Catalyst - * predicate [[Expression]]s that are either not convertible or cannot be handled by - * `relation`. The second element contains all converted data source [[Filter]]s that - * will be pushed down to the data source. + * @return A triplet of `Seq[Expression]`, `Seq[Filter]`, and `Seq[Filter]` . The first element + * contains all Catalyst predicate [[Expression]]s that are either not convertible or + * cannot be handled by `relation`. The second element contains all converted data source + * [[Filter]]s that will be pushed down to the data source. The third element contains + * all [[Filter]]s that are completely filtered at the DataSource. */ protected[sql] def selectFilters( - relation: BaseRelation, predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = { + relation: BaseRelation, + predicates: Seq[Expression]): (Seq[Expression], Seq[Filter], Set[Filter]) = { + // For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are // called `predicate`s, while all data source filters of type `sources.Filter` are simply called // `filter`s. @@ -521,7 +530,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val unhandledPredicates = translatedMap.filter { case (p, f) => unhandledFilters.contains(f) }.keys + val handledFilters = pushedFilters.toSet -- unhandledFilters - (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters) + (nonconvertiblePredicates ++ unhandledPredicates, pushedFilters, handledFilters) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index f59d474d00ec2b632da7192c6e2e777a37fe45cd..d846b27ffed03dc6940c2cf7f1593f73220f9bed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -69,7 +69,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex }.flatten.reduceLeftOption(_ && _) assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - val (_, selectedFilters) = + val (_, selectedFilters, _) = DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) assert(selectedFilters.nonEmpty, "No filter is pushed down") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala index 7a30e548cd8c77c4c784ced2ab800a0896af3f4d..471192a369f4ac7b678b271e9197b67e65f9216e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala @@ -51,7 +51,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { }.flatten.reduceLeftOption(_ && _) assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - val (_, selectedFilters) = + val (_, selectedFilters, _) = DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) assert(selectedFilters.nonEmpty, "No filter is pushed down") @@ -95,7 +95,7 @@ class OrcFilterSuite extends QueryTest with OrcTest { }.flatten.reduceLeftOption(_ && _) assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query") - val (_, selectedFilters) = + val (_, selectedFilters, _) = DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) assert(selectedFilters.nonEmpty, "No filter is pushed down")