diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 8ebad676ca310635bf542fef0765ae8513053b62..bfe9c8e351abcc9d953406516f22c0355245e491 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -23,18 +23,18 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.sources.{BaseRelation, Filter}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
 trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
@@ -135,7 +135,7 @@ case class RowDataSourceScanExec(
  * @param output Output attributes of the scan.
  * @param outputSchema Output schema of the scan.
  * @param partitionFilters Predicates to use for partition pruning.
- * @param dataFilters Data source filters to use for filtering data within partitions.
+ * @param dataFilters Filters on non-partition columns.
  * @param metastoreTableIdentifier identifier for the table in the metastore.
  */
 case class FileSourceScanExec(
@@ -143,7 +143,7 @@ case class FileSourceScanExec(
     output: Seq[Attribute],
     outputSchema: StructType,
     partitionFilters: Seq[Expression],
-    dataFilters: Seq[Filter],
+    dataFilters: Seq[Expression],
     override val metastoreTableIdentifier: Option[TableIdentifier])
   extends DataSourceScanExec with ColumnarBatchScan  {
 
@@ -156,7 +156,8 @@ case class FileSourceScanExec(
     false
   }
 
-  @transient private lazy val selectedPartitions = relation.location.listFiles(partitionFilters)
+  @transient private lazy val selectedPartitions =
+    relation.location.listFiles(partitionFilters, dataFilters)
 
   override val (outputPartitioning, outputOrdering): (Partitioning, Seq[SortOrder]) = {
     val bucketSpec = if (relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -225,6 +226,10 @@ case class FileSourceScanExec(
     }
   }
 
+  @transient
+  private val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
+  logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
+
   // These metadata values make scan plans uniquely identifiable for equality checking.
   override val metadata: Map[String, String] = {
     def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
@@ -237,7 +242,7 @@ case class FileSourceScanExec(
         "ReadSchema" -> outputSchema.catalogString,
         "Batched" -> supportsBatch.toString,
         "PartitionFilters" -> seqToString(partitionFilters),
-        "PushedFilters" -> seqToString(dataFilters),
+        "PushedFilters" -> seqToString(pushedDownFilters),
         "Location" -> locationDesc)
     val withOptPartitionCount =
       relation.partitionSchemaOption.map { _ =>
@@ -255,7 +260,7 @@ case class FileSourceScanExec(
         dataSchema = relation.dataSchema,
         partitionSchema = relation.partitionSchema,
         requiredSchema = outputSchema,
-        filters = dataFilters,
+        filters = pushedDownFilters,
         options = relation.options,
         hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 769deb1890b6df26b32157b3df9809ca877b062a..3c046ce49428599c01590c4ec2d0e633860fb1d0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -98,7 +98,7 @@ case class OptimizeMetadataOnlyQuery(
         relation match {
           case l @ LogicalRelation(fsRelation: HadoopFsRelation, _, _) =>
             val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l)
-            val partitionData = fsRelation.location.listFiles(filters = Nil)
+            val partitionData = fsRelation.location.listFiles(Nil, Nil)
             LocalRelation(partAttrs, partitionData.map(_.values))
 
           case relation: CatalogRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index d6c4b97ebd080bf9e6e0c53ae55201a7ced77399..db0254f8d5581b922bc2a675b00ee47cb4c319e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -54,8 +54,9 @@ class CatalogFileIndex(
 
   override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
 
-  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
-    filterPartitions(filters).listFiles(Nil)
+  override def listFiles(
+      partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+    filterPartitions(partitionFilters).listFiles(Nil, dataFilters)
   }
 
   override def refresh(): Unit = fileStatusCache.invalidateAll()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 277223d52ec52dd0cc4add6548021a98a5aa1519..6b99d38fe5729b7b6b707d6e4c1a1d8b2d3189f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -46,12 +46,17 @@ trait FileIndex {
    * Returns all valid files grouped into partitions when the data is partitioned. If the data is
    * unpartitioned, this will return a single partition with no partition values.
    *
-   * @param filters The filters used to prune which partitions are returned.  These filters must
-   *                only refer to partition columns and this method will only return files
-   *                where these predicates are guaranteed to evaluate to `true`.  Thus, these
-   *                filters will not need to be evaluated again on the returned data.
+   * @param partitionFilters The filters used to prune which partitions are returned. These filters
+   *                         must only refer to partition columns and this method will only return
+   *                         files where these predicates are guaranteed to evaluate to `true`.
+   *                         Thus, these filters will not need to be evaluated again on the
+   *                         returned data.
+   * @param dataFilters Filters that can be applied on non-partitioned columns. The implementation
+   *                    does not need to guarantee these filters are applied, i.e. the execution
+   *                    engine will ensure these filters are still applied on the returned files.
    */
-  def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
+  def listFiles(
+      partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
 
   /**
    * Returns the list of files that will be read when scanning this relation. This call may be
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 26e1380eca4993672df3889d46c584e1f89b346c..17f7e0e601c0ce7639e9c774303e2e9eef7f46db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -100,9 +100,6 @@ object FileSourceStrategy extends Strategy with Logging {
       val outputSchema = readDataColumns.toStructType
       logInfo(s"Output Data Schema: ${outputSchema.simpleString(5)}")
 
-      val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
-      logInfo(s"Pushed Filters: ${pushedDownFilters.mkString(",")}")
-
       val outputAttributes = readDataColumns ++ partitionColumns
 
       val scan =
@@ -111,7 +108,7 @@ object FileSourceStrategy extends Strategy with Logging {
           outputAttributes,
           outputSchema,
           partitionKeyFilters.toSeq,
-          pushedDownFilters,
+          dataFilters,
           table.map(_.identifier))
 
       val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index db8bbc52aaf4deedc858a6a35d54fbed426bd2da..71500a010581e0617b23e106aa153a7c3734f806 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -54,17 +54,19 @@ abstract class PartitioningAwareFileIndex(
 
   override def partitionSchema: StructType = partitionSpec().partitionColumns
 
-  protected val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(parameters)
+  protected val hadoopConf: Configuration =
+    sparkSession.sessionState.newHadoopConfWithOptions(parameters)
 
   protected def leafFiles: mutable.LinkedHashMap[Path, FileStatus]
 
   protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]
 
-  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+  override def listFiles(
+      partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
     val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
       PartitionDirectory(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
     } else {
-      prunePartitions(filters, partitionSpec()).map {
+      prunePartitions(partitionFilters, partitionSpec()).map {
         case PartitionPath(values, path) =>
           val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
             case Some(existingDir) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 9f0d1ceb28fcaaa76fbb07fbd8d2331f813b0f1d..2e060ab9f680169a1f79bf0e689b79d36042aa04 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hive
 
-import java.net.URI
-
 import scala.util.control.NonFatal
 
 import com.google.common.util.concurrent.Striped
@@ -248,7 +246,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
         .inferSchema(
           sparkSession,
           options,
-          fileIndex.listFiles(Nil).flatMap(_.files))
+          fileIndex.listFiles(Nil, Nil).flatMap(_.files))
         .map(mergeWithMetastoreSchema(relation.tableMeta.schema, _))
 
       inferredSchema match {