Skip to content
Snippets Groups Projects
Commit 3a205bbd authored by Yash Datta's avatar Yash Datta Committed by Michael Armbrust
Browse files

[SQL][SPARK-6742]: Don't push down predicates which reference partition column(s)

cc liancheng

Author: Yash Datta <Yash.Datta@guavus.com>

Closes #5390 from saucam/fpush and squashes the following commits:

3f026d6 [Yash Datta] SPARK-6742: Fix scalastyle
ce3d702 [Yash Datta] SPARK-6742: Add test case, fix scalastyle
8592acc [Yash Datta] SPARK-6742: Don't push down predicates which reference partition column(s)
parent 85ee0cab
No related branches found
No related tags found
No related merge requests found
......@@ -215,6 +215,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
val partitionColNames = relation.partitioningAttributes.map(_.name).toSet
val filtersToPush = filters.filter { pred =>
val referencedColNames = pred.references.map(_.name).toSet
referencedColNames.intersect(partitionColNames).isEmpty
}
val prunePushedDownFilters =
if (sqlContext.conf.parquetFilterPushDown) {
(predicates: Seq[Expression]) => {
......@@ -226,6 +231,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// "A AND B" in the higher-level filter, not just "B".
predicates.map(p => p -> ParquetFilters.createFilter(p)).collect {
case (predicate, None) => predicate
// Filter needs to be applied above when it contains partitioning
// columns
case (predicate, _) if(!predicate.references.map(_.name).toSet
.intersect (partitionColNames).isEmpty) => predicate
}
}
} else {
......@@ -238,7 +247,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
ParquetTableScan(
_,
relation,
if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil
if (sqlContext.conf.parquetFilterPushDown) filtersToPush else Nil)) :: Nil
case _ => Nil
}
......
......@@ -22,7 +22,7 @@ import parquet.filter2.predicate.Operators._
import parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, Predicate, Row}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.test.TestSQLContext
......@@ -350,4 +350,26 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
override protected def afterAll(): Unit = {
sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
test("SPARK-6742: don't push down predicates which reference partition columns") {
import sqlContext.implicits._
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
val df = DataFrame(sqlContext, org.apache.spark.sql.parquet.ParquetRelation(
path,
Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
Seq(AttributeReference("part", IntegerType, false)()) ))
checkAnswer(
df.filter("a = 1 or part = 1"),
(1 to 3).map(i => Row(1, i, i.toString)))
}
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment