Skip to content
Snippets Groups Projects
Commit ec100321 authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SPARK-5465] [SQL] Fixes filter push-down for Parquet data source

Not all Catalyst filter expressions can be converted to Parquet filter predicates. We should try to convert each individual predicate and then collect those convertible ones.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4255)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #4255 from liancheng/spark-5465 and squashes the following commits:

14ccd37 [Cheng Lian] Fixes filter push-down for Parquet data source
parent 8cf4a1f0
No related branches found
No related tags found
No related merge requests found
......@@ -20,26 +20,26 @@ import java.util.{List => JList}
import scala.collection.JavaConversions._
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce.{JobContext, InputSplit, Job}
import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext}
import parquet.filter2.predicate.FilterApi
import parquet.hadoop.ParquetInputFormat
import parquet.hadoop.util.ContextUtil
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.{Partition => SparkPartition, Logging}
import org.apache.spark.rdd.{NewHadoopPartition, RDD}
import org.apache.spark.sql.{SQLConf, Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
import org.apache.spark.{Logging, Partition => SparkPartition}
/**
* Allows creation of parquet based tables using the syntax
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* `CREATE TEMPORARY TABLE ... USING org.apache.spark.sql.parquet`. Currently the only option
* required is `path`, which should be the location of a collection of, optionally partitioned,
* parquet files.
*/
......@@ -193,10 +193,12 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths(job, selectedFiles: _*)
}
// Push down filters when possible
// Push down filters when possible. Notice that not all filters can be converted to Parquet
// filter predicate. Here we try to convert each individual predicate and only collect those
// convertible ones.
predicates
.reduceOption(And)
.flatMap(ParquetFilters.createFilter)
.reduceOption(FilterApi.and)
.filter(_ => sqlContext.conf.parquetFilterPushDown)
.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment