Skip to content
Snippets Groups Projects
Commit f6883bb7 authored by hyukjinkwon's avatar hyukjinkwon Committed by Cheng Lian
Browse files

[SPARK-11676][SQL] Parquet filter tests all pass if filters are not really pushed down

Currently Parquet predicate tests all pass even if filters are not pushed down or this is disabled.

In this PR, For checking evaluating filters, Simply it makes the expression from `expression.Filter` and then try to create filters just like Spark does.

For checking the results, this manually accesses to the child rdd (of `expression.Filter`) and produces the results which should be filtered properly, and then compares it to expected values.

Now, if filters are not pushed down or this is disabled, this throws exceptions.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #9659 from HyukjinKwon/SPARK-11676.
parent 3934562d
No related branches found
No related tags found
No related merge requests found
...@@ -50,27 +50,33 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex ...@@ -50,27 +50,33 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
val output = predicate.collect { case a: Attribute => a }.distinct val output = predicate.collect { case a: Attribute => a }.distinct
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val query = df withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
.select(output.map(e => Column(e)): _*) val query = df
.where(Column(predicate)) .select(output.map(e => Column(e)): _*)
.where(Column(predicate))
val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, _)) => filters var maybeRelation: Option[ParquetRelation] = None
}.flatten.reduceLeftOption(_ && _) val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
assert(maybeAnalyzedPredicate.isDefined) case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _)) =>
maybeRelation = Some(relation)
val selectedFilters = maybeAnalyzedPredicate.flatMap(DataSourceStrategy.translateFilter) filters
assert(selectedFilters.nonEmpty) }.flatten.reduceLeftOption(_ && _)
assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")
selectedFilters.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(df.schema, pred) val (_, selectedFilters) =
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
maybeFilter.foreach { f => assert(selectedFilters.nonEmpty, "No filter is pushed down")
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
assert(f.getClass === filterClass) selectedFilters.foreach { pred =>
val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
maybeFilter.foreach { f =>
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
assert(f.getClass === filterClass)
}
} }
checker(stripSparkFilter(query), expected)
} }
checker(query, expected)
} }
} }
...@@ -104,6 +110,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex ...@@ -104,6 +110,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df) checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
} }
/**
* Strip Spark-side filtering in order to check if a datasource filters rows correctly.
*/
protected def stripSparkFilter(df: DataFrame): DataFrame = {
val schema = df.schema
val childRDD = df
.queryExecution
.executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
.child
.execute()
.map(row => Row.fromSeq(row.toSeq(schema)))
sqlContext.createDataFrame(childRDD, schema)
}
test("filter pushdown - boolean") { test("filter pushdown - boolean") {
withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df => withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row]) checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
...@@ -347,19 +368,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex ...@@ -347,19 +368,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path) (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
val df = sqlContext.read.parquet(path).filter("a = 2") val df = sqlContext.read.parquet(path).filter("a = 2")
// This is the source RDD without Spark-side filtering.
val childRDD =
df
.queryExecution
.executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
.child
.execute()
// The result should be single row. // The result should be single row.
// When a filter is pushed to Parquet, Parquet can apply it to every row. // When a filter is pushed to Parquet, Parquet can apply it to every row.
// So, we can check the number of rows returned from the Parquet // So, we can check the number of rows returned from the Parquet
// to make sure our filter pushdown work. // to make sure our filter pushdown work.
assert(childRDD.count == 1) assert(stripSparkFilter(df).count == 1)
} }
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment