diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 239151495f4bd7b706c737b45d0013375db8f386..2fa660c4d5e018f75703f9f3779a3b873fc04b13 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -172,12 +172,13 @@ case class FileSourceScanExec(
   }
 
   @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+    val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L)
     val startTime = System.nanoTime()
     val ret = relation.location.listFiles(partitionFilters, dataFilters)
-    val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
+    val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000
 
     metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
-    metrics("metadataTime").add(timeTaken)
+    metrics("metadataTime").add(timeTakenMs)
 
     val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index db0254f8d5581b922bc2a675b00ee47cb4c319e2..4046396d0e6149f667f705ce6aca93734a8cb8a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -69,6 +69,7 @@ class CatalogFileIndex(
    */
   def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
     if (table.partitionColumnNames.nonEmpty) {
+      val startTime = System.nanoTime()
       val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
         table.identifier, filters)
       val partitions = selectedPartitions.map { p =>
@@ -79,8 +80,9 @@ class CatalogFileIndex(
           path.makeQualified(fs.getUri, fs.getWorkingDirectory))
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
+      val timeNs = System.nanoTime() - startTime
       new PrunedInMemoryFileIndex(
-        sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
+        sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs))
     } else {
       new InMemoryFileIndex(
         sparkSession, rootPaths, table.storage.properties, partitionSchema = None)
@@ -111,7 +113,8 @@ private class PrunedInMemoryFileIndex(
     sparkSession: SparkSession,
     tableBasePath: Path,
     fileStatusCache: FileStatusCache,
-    override val partitionSpec: PartitionSpec)
+    override val partitionSpec: PartitionSpec,
+    override val metadataOpsTimeNs: Option[Long])
   extends InMemoryFileIndex(
     sparkSession,
     partitionSpec.partitions.map(_.path),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 6b99d38fe5729b7b6b707d6e4c1a1d8b2d3189f8..094a66a2820f3a29601375032f92066ded18320b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -72,4 +72,14 @@ trait FileIndex {
 
   /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */
   def partitionSchema: StructType
+
+  /**
+   * Returns an optional metadata operation time, in nanoseconds, for listing files.
+   *
+   * We do file listing in query optimization (in order to get the proper statistics) and we want
+   * to account for file listing time in physical execution (as metrics). To do that, we save the
+   * file listing time in some implementations and physical execution calls it in this method
+   * to update the metrics.
+   */
+  def metadataOpsTimeNs: Option[Long] = None
 }