Skip to content
Snippets Groups Projects
Commit 8537c00e authored by Reynold Xin's avatar Reynold Xin Committed by Xiao Li
Browse files

[SPARK-19987][SQL] Pass all filters into FileIndex

## What changes were proposed in this pull request?
This is a tiny teeny refactoring to pass data filters also to the FileIndex, so FileIndex can have a more global view on predicates.

## How was this patch tested?
Change should be covered by existing test cases.

Author: Reynold Xin <rxin@databricks.com>

Closes #17322 from rxin/SPARK-19987.
parent 4c320054
No related branches found
No related tags found
No related merge requests found
......@@ -23,18 +23,18 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
......@@ -135,7 +135,7 @@ case class RowDataSourceScanExec(
* @param output Output attributes of the scan.
* @param outputSchema Output schema of the scan.
* @param partitionFilters Predicates to use for partition pruning.
* @param dataFilters Data source filters to use for filtering data within partitions.
* @param dataFilters Filters on non-partition columns.
* @param metastoreTableIdentifier identifier for the table in the metastore.
*/
case class FileSourceScanExec(
......@@ -143,7 +143,7 @@ case class FileSourceScanExec(
output: Seq[Attribute],
outputSchema: StructType,
partitionFilters: Seq[Expression],
dataFilters: Seq[Filter],
dataFilters: Seq[Expression],
override val metastoreTableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {
......@@ -156,7 +156,8 @@ case class FileSourceScanExec(
false
}
@transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
@transient private lazy val selectedPartitions =
relation.location.listFiles(partitionFilters, dataFilters)
override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
......@@ -225,6 +226,10 @@ case class FileSourceScanExec(
}
}
@transient
private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
// These metadata values make scan plans uniquely identifiable for equality checking.
override val metadata: Map[String, String] = {
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
......@@ -237,7 +242,7 @@ case class FileSourceScanExec(
"ReadSchema" -> outputSchema.catalogString,
"Batched" -> supportsBatch.toString,
"PartitionFilters" -> seqToString(partitionFilters),
"PushedFilters" -> seqToString(dataFilters),
"PushedFilters" -> seqToString(pushedDownFilters),
"Location" -> locationDesc)
val withOptPartitionCount =
relation.partitionSchemaOption.map { _ =>
......@@ -255,7 +260,7 @@ case class FileSourceScanExec(
dataSchema = relation.dataSchema,
partitionSchema = relation.partitionSchema,
requiredSchema = outputSchema,
filters = dataFilters,
filters = pushedDownFilters,
options = relation.options,
hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
......
......@@ -98,7 +98,7 @@ case class OptimizeMetadataOnlyQuery(
relation match {
case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
val partitionData = fsRelation.location.listFiles(filters = Nil)
val partitionData = fsRelation.location.listFiles(Nil, Nil)
LocalRelation(partAttrs, partitionData.map(_.values))
case relation: CatalogRelation =>
......
......@@ -54,8 +54,9 @@ class CatalogFileIndex(
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(filters).listFiles(Nil)
override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
filterPartitions(partitionFilters).listFiles(Nil, dataFilters)
}
override def refresh(): Unit = fileStatusCache.invalidateAll()
......
......@@ -46,12 +46,17 @@ trait FileIndex {
* Returns all valid files grouped into partitions when the data is partitioned. If the data is
* unpartitioned, this will return a single partition with no partition values.
*
* @param filters The filters used to prune which partitions are returned. These filters must
* only refer to partition columns and this method will only return files
* where these predicates are guaranteed to evaluate to `true`. Thus, these
* filters will not need to be evaluated again on the returned data.
* @param partitionFilters The filters used to prune which partitions are returned. These filters
* must only refer to partition columns and this method will only return
* files where these predicates are guaranteed to evaluate to `true`.
* Thus, these filters will not need to be evaluated again on the
* returned data.
* @param dataFilters Filters that can be applied on non-partitioned columns. The implementation
* does not need to guarantee these filters are applied, i.e. the execution
* engine will ensure these filters are still applied on the returned files.
*/
def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
/**
* Returns the list of files that will be read when scanning this relation. This call may be
......
......@@ -100,9 +100,6 @@ object FileSourceStrategy extends Strategy with Logging {
val outputSchema = readDataColumns.toStructType
logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
val outputAttributes = readDataColumns ++ partitionColumns
val scan =
......@@ -111,7 +108,7 @@ object FileSourceStrategy extends Strategy with Logging {
outputAttributes,
outputSchema,
partitionKeyFilters.toSeq,
pushedDownFilters,
dataFilters,
table.map(_.identifier))
val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
......
......@@ -54,17 +54,19 @@ abstract class PartitioningAwareFileIndex(
override def partitionSchema: StructType = partitionSpec().partitionColumns
protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
protected val hadoopConf: Configuration =
sparkSession.sessionState.newHadoopConfWithOptions(parameters)
protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
} else {
prunePartitions(filters, partitionSpec()).map {
prunePartitions(partitionFilters, partitionSpec()).map {
case PartitionPath(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
......
......@@ -17,8 +17,6 @@
package org.apache.spark.sql.hive
import java.net.URI
import scala.util.control.NonFatal
import com.google.common.util.concurrent.Striped
......@@ -248,7 +246,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
.inferSchema(
sparkSession,
options,
fileIndex.listFiles(Nil).flatMap(_.files))
fileIndex.listFiles(Nil, Nil).flatMap(_.files))
.map(mergeWithMetastoreSchema(relation.tableMeta.schema, _))
inferredSchema match {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment