Skip to content
Snippets Groups Projects
Commit 985b38dd authored by Zee Chen's avatar Zee Chen Committed by Michael Armbrust
Browse files

[SPARK-11390][SQL] Query plan with/without filterPushdown indistinguishable

…ishable

Propagate pushed filters to PhyicalRDD in DataSourceStrategy.apply

Author: Zee Chen <zeechen@us.ibm.com>

Closes #9679 from zeocio/spark-11390.
parent b1a96626
No related branches found
No related tags found
No related merge requests found
......@@ -106,7 +106,9 @@ private[sql] object PhysicalRDD {
def createFromDataSource(
output: Seq[Attribute],
rdd: RDD[InternalRow],
relation: BaseRelation): PhysicalRDD = {
PhysicalRDD(output, rdd, relation.toString, relation.isInstanceOf[HadoopFsRelation])
relation: BaseRelation,
extraInformation: String = ""): PhysicalRDD = {
PhysicalRDD(output, rdd, relation.toString + extraInformation,
relation.isInstanceOf[HadoopFsRelation])
}
}
......@@ -315,6 +315,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)
val pushedFiltersString = pushedFilters.mkString(" PushedFilter: [", ",", "] ")
if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
filterSet.subsetOf(projectSet)) {
......@@ -332,7 +334,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation)
relation.relation, pushedFiltersString)
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
// Don't request columns that are only referenced by pushed filters.
......@@ -342,7 +344,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val scan = execution.PhysicalRDD.createFromDataSource(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation)
relation.relation, pushedFiltersString)
execution.Project(
projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
......
......@@ -160,6 +160,20 @@ class PlannerSuite extends SharedSQLContext {
}
}
test("SPARK-11390 explain should print PushedFilters of PhysicalRDD") {
withTempPath { file =>
val path = file.getCanonicalPath
testData.write.parquet(path)
val df = sqlContext.read.parquet(path)
sqlContext.registerDataFrameAsTable(df, "testPushed")
withTempTable("testPushed") {
val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
assert(exp.toString.contains("PushedFilter: [EqualTo(key,15)]"))
}
}
}
test("efficient limit -> project -> sort") {
{
val query =
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment