From 90d3b91f4cb59d84fea7105d54ef8c87a7d5c6a2 Mon Sep 17 00:00:00 2001
From: Eric Liang <ekl@databricks.com>
Date: Sun, 30 Oct 2016 13:14:45 -0700
Subject: [PATCH] [SPARK-18103][SQL] Rename *FileCatalog to *FileIndex

## What changes were proposed in this pull request?

To reduce the number of components in SQL named *Catalog, rename *FileCatalog to *FileIndex. A FileIndex is responsible for returning the list of partitions / files to scan given a filtering expression.

```
TableFileCatalog => CatalogFileIndex
FileCatalog => FileIndex
ListingFileCatalog => InMemoryFileIndex
MetadataLogFileCatalog => MetadataLogFileIndex
PrunedTableFileCatalog => PrunedInMemoryFileIndex
```

cc yhuai marmbrus

## How was this patch tested?

N/A

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #15634 from ericl/rename-file-provider.
---
 .../spark/metrics/source/StaticSources.scala  |  2 +-
 .../spark/sql/execution/CacheManager.scala    |  2 +-
 ...leCatalog.scala => CatalogFileIndex.scala} | 24 ++++++-------
 .../execution/datasources/DataSource.scala    | 10 +++---
 .../{FileCatalog.scala => FileIndex.scala}    |  2 +-
 .../datasources/HadoopFsRelation.scala        |  4 +--
 ...eCatalog.scala => InMemoryFileIndex.scala} |  8 ++---
 ...scala => PartitioningAwareFileIndex.scala} | 16 ++++-----
 .../PruneFileSourcePartitions.scala           |  6 ++--
 .../streaming/CompactibleFileStreamLog.scala  |  4 +--
 .../streaming/FileStreamSource.scala          |  4 +--
 .../streaming/MetadataLogFileCatalog.scala    |  6 ++--
 .../datasources/FileCatalogSuite.scala        | 36 +++++++++----------
 .../datasources/FileSourceStrategySuite.scala |  2 +-
 .../ParquetPartitionDiscoverySuite.scala      |  2 +-
 .../sql/streaming/FileStreamSinkSuite.scala   |  6 ++--
 .../sql/streaming/FileStreamSourceSuite.scala |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala |  4 +--
 .../spark/sql/hive/CachedTableSuite.scala     | 10 +++---
 .../hive/PartitionedTablePerfStatsSuite.scala |  2 +-
 .../PruneFileSourcePartitionsSuite.scala      |  6 ++--
 21 files changed, 79 insertions(+), 79 deletions(-)
 rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/{TableFileCatalog.scala => CatalogFileIndex.scala} (83%)
 rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/{FileCatalog.scala => FileIndex.scala} (99%)
 rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/{ListingFileCatalog.scala => InMemoryFileIndex.scala} (92%)
 rename sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/{PartitioningAwareFileCatalog.scala => PartitioningAwareFileIndex.scala} (96%)

diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index b54885b7ff..3f7cfd9d2c 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -76,7 +76,7 @@ object HiveCatalogMetrics extends Source {
   val METRIC_PARTITIONS_FETCHED = metricRegistry.counter(MetricRegistry.name("partitionsFetched"))
 
   /**
-   * Tracks the total number of files discovered off of the filesystem by ListingFileCatalog.
+   * Tracks the total number of files discovered off of the filesystem by InMemoryFileIndex.
    */
   val METRIC_FILES_DISCOVERED = metricRegistry.counter(MetricRegistry.name("filesDiscovered"))
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index fb72c679e3..526623a36d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -177,7 +177,7 @@ class CacheManager extends Logging {
 
   /**
    * Traverses a given `plan` and searches for the occurrences of `qualifiedPath` in the
-   * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any [[HadoopFsRelation]] nodes
+   * [[org.apache.spark.sql.execution.datasources.FileIndex]] of any [[HadoopFsRelation]] nodes
    * in the plan. If found, we refresh the metadata and return true. Otherwise, this method returns
    * false.
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
similarity index 83%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index b459df5734..092aabc89a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -26,23 +26,23 @@ import org.apache.spark.sql.types.StructType
 
 
 /**
- * A [[FileCatalog]] for a metastore catalog table.
+ * A [[FileIndex]] for a metastore catalog table.
  *
  * @param sparkSession a [[SparkSession]]
  * @param table the metadata of the table
  * @param sizeInBytes the table's data size in bytes
  */
-class TableFileCatalog(
+class CatalogFileIndex(
     sparkSession: SparkSession,
     val table: CatalogTable,
-    override val sizeInBytes: Long) extends FileCatalog {
+    override val sizeInBytes: Long) extends FileIndex {
 
   protected val hadoopConf = sparkSession.sessionState.newHadoopConf
 
   private val fileStatusCache = FileStatusCache.newCache(sparkSession)
 
   assert(table.identifier.database.isDefined,
-    "The table identifier must be qualified in TableFileCatalog")
+    "The table identifier must be qualified in CatalogFileIndex")
 
   private val baseLocation = table.storage.locationUri
 
@@ -57,12 +57,12 @@ class TableFileCatalog(
   override def refresh(): Unit = fileStatusCache.invalidateAll()
 
   /**
-   * Returns a [[ListingFileCatalog]] for this table restricted to the subset of partitions
+   * Returns a [[InMemoryFileIndex]] for this table restricted to the subset of partitions
    * specified by the given partition-pruning filters.
    *
    * @param filters partition-pruning filters
    */
-  def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
+  def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
     if (table.partitionColumnNames.nonEmpty) {
       val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
         table.identifier, filters)
@@ -70,20 +70,20 @@ class TableFileCatalog(
         PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
-      new PrunedTableFileCatalog(
+      new PrunedInMemoryFileIndex(
         sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
     } else {
-      new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None)
+      new InMemoryFileIndex(sparkSession, rootPaths, table.storage.properties, None)
     }
   }
 
   override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
 
-  // `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
+  // `CatalogFileIndex` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
   // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
   // implement `equals` and `hashCode` here, to make it work with cache lookup.
   override def equals(o: Any): Boolean = o match {
-    case other: TableFileCatalog => this.table.identifier == other.table.identifier
+    case other: CatalogFileIndex => this.table.identifier == other.table.identifier
     case _ => false
   }
 
@@ -97,12 +97,12 @@ class TableFileCatalog(
  * @param tableBasePath The default base path of the Hive metastore table
  * @param partitionSpec The partition specifications from Hive metastore
  */
-private class PrunedTableFileCatalog(
+private class PrunedInMemoryFileIndex(
     sparkSession: SparkSession,
     tableBasePath: Path,
     fileStatusCache: FileStatusCache,
     override val partitionSpec: PartitionSpec)
-  extends ListingFileCatalog(
+  extends InMemoryFileIndex(
     sparkSession,
     partitionSpec.partitions.map(_.path),
     Map.empty,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 5b8f05a396..996109865f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -202,7 +202,7 @@ case class DataSource(
         val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
         SparkHadoopUtil.get.globPathIfNecessary(qualified)
       }.toArray
-      val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
+      val fileCatalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
       val partitionSchema = fileCatalog.partitionSpec().partitionColumns
       val inferred = format.inferSchema(
         sparkSession,
@@ -364,7 +364,7 @@ case class DataSource(
       case (format: FileFormat, _)
           if hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths) =>
         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
-        val fileCatalog = new MetadataLogFileCatalog(sparkSession, basePath)
+        val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
         val dataSchema = userSpecifiedSchema.orElse {
           format.inferSchema(
             sparkSession,
@@ -417,12 +417,12 @@ case class DataSource(
 
         val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions &&
             catalogTable.isDefined && catalogTable.get.partitionProviderIsHive) {
-          new TableFileCatalog(
+          new CatalogFileIndex(
             sparkSession,
             catalogTable.get,
             catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L))
         } else {
-          new ListingFileCatalog(
+          new InMemoryFileIndex(
             sparkSession, globbedPaths, options, partitionSchema)
         }
 
@@ -433,7 +433,7 @@ case class DataSource(
           format.inferSchema(
             sparkSession,
             caseInsensitiveOptions,
-            fileCatalog.asInstanceOf[ListingFileCatalog].allFiles())
+            fileCatalog.asInstanceOf[InMemoryFileIndex].allFiles())
         }.getOrElse {
           throw new AnalysisException(
             s"Unable to infer schema for $format at ${allPaths.take(2).mkString(",")}. " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
similarity index 99%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index dba64624c3..277223d52e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -33,7 +33,7 @@ case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
  * An interface for objects capable of enumerating the root paths of a relation as well as the
  * partitions of a relation subject to some pruning expressions.
  */
-trait FileCatalog {
+trait FileIndex {
 
   /**
    * Returns the list of root input paths from which the catalog will get files. There may be a
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index afad889808..014abd454f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.types.StructType
  * Acts as a container for all of the metadata required to read from a datasource. All discovery,
  * resolution and merging logic for schemas and partitions has been removed.
  *
- * @param location A [[FileCatalog]] that can enumerate the locations of all the files that
+ * @param location A [[FileIndex]] that can enumerate the locations of all the files that
  *                 comprise this relation.
  * @param partitionSchema The schema of the columns (if any) that are used to partition the relation
  * @param dataSchema The schema of any remaining columns.  Note that if any partition columns are
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
  * @param options Configuration used when reading / writing data.
  */
 case class HadoopFsRelation(
-    location: FileCatalog,
+    location: FileIndex,
     partitionSchema: StructType,
     dataSchema: StructType,
     bucketSpec: Option[BucketSpec],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
similarity index 92%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index d9d588388a..7531f0ae02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.types.StructType
 
 
 /**
- * A [[FileCatalog]] that generates the list of files to process by recursively listing all the
+ * A [[FileIndex]] that generates the list of files to process by recursively listing all the
  * files present in `paths`.
  *
  * @param rootPaths the list of root table paths to scan
@@ -34,13 +34,13 @@ import org.apache.spark.sql.types.StructType
  * @param partitionSchema an optional partition schema that will be use to provide types for the
  *                        discovered partitions
  */
-class ListingFileCatalog(
+class InMemoryFileIndex(
     sparkSession: SparkSession,
     override val rootPaths: Seq[Path],
     parameters: Map[String, String],
     partitionSchema: Option[StructType],
     fileStatusCache: FileStatusCache = NoopCache)
-  extends PartitioningAwareFileCatalog(
+  extends PartitioningAwareFileIndex(
     sparkSession, parameters, partitionSchema, fileStatusCache) {
 
   @volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@@ -79,7 +79,7 @@ class ListingFileCatalog(
   }
 
   override def equals(other: Any): Boolean = other match {
-    case hdfs: ListingFileCatalog => rootPaths.toSet == hdfs.rootPaths.toSet
+    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
     case _ => false
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
similarity index 96%
rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
rename to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index cc4049e925..a8a722dd3c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -34,19 +34,19 @@ import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
 /**
- * An abstract class that represents [[FileCatalog]]s that are aware of partitioned tables.
+ * An abstract class that represents [[FileIndex]]s that are aware of partitioned tables.
  * It provides the necessary methods to parse partition data based on a set of files.
  *
  * @param parameters as set of options to control partition discovery
  * @param userPartitionSchema an optional partition schema that will be use to provide types for
  *                            the discovered partitions
  */
-abstract class PartitioningAwareFileCatalog(
+abstract class PartitioningAwareFileIndex(
     sparkSession: SparkSession,
     parameters: Map[String, String],
     userPartitionSchema: Option[StructType],
-    fileStatusCache: FileStatusCache = NoopCache) extends FileCatalog with Logging {
-  import PartitioningAwareFileCatalog.BASE_PATH_PARAM
+    fileStatusCache: FileStatusCache = NoopCache) extends FileIndex with Logging {
+  import PartitioningAwareFileIndex.BASE_PATH_PARAM
 
   /** Returns the specification of the partitions inferred from the data. */
   def partitionSpec(): PartitionSpec
@@ -253,9 +253,9 @@ abstract class PartitioningAwareFileCatalog(
     }
     val discovered = if (pathsToFetch.length >=
         sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      PartitioningAwareFileCatalog.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
+      PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
     } else {
-      PartitioningAwareFileCatalog.listLeafFilesInSerial(pathsToFetch, hadoopConf)
+      PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
     }
     discovered.foreach { case (path, leafFiles) =>
       HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
@@ -266,7 +266,7 @@ abstract class PartitioningAwareFileCatalog(
   }
 }
 
-object PartitioningAwareFileCatalog extends Logging {
+object PartitioningAwareFileIndex extends Logging {
   val BASE_PATH_PARAM = "basePath"
 
   /** A serializable variant of HDFS's BlockLocation. */
@@ -383,7 +383,7 @@ object PartitioningAwareFileCatalog extends Logging {
     if (shouldFilterOut(name)) {
       Seq.empty[FileStatus]
     } else {
-      // [SPARK-17599] Prevent ListingFileCatalog from failing if path doesn't exist
+      // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't exist
       // Note that statuses only include FileStatus for the files and dirs directly under path,
       // and does not include anything else recursively.
       val statuses = try fs.listStatus(path) catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index 8689017c3e..8566a80610 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -28,7 +28,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
         logicalRelation @
           LogicalRelation(fsRelation @
             HadoopFsRelation(
-              tableFileCatalog: TableFileCatalog,
+              catalogFileIndex: CatalogFileIndex,
               partitionSchema,
               _,
               _,
@@ -56,9 +56,9 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
         ExpressionSet(normalizedFilters.filter(_.references.subsetOf(partitionSet)))
 
       if (partitionKeyFilters.nonEmpty) {
-        val prunedFileCatalog = tableFileCatalog.filterPartitions(partitionKeyFilters.toSeq)
+        val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
         val prunedFsRelation =
-          fsRelation.copy(location = prunedFileCatalog)(sparkSession)
+          fsRelation.copy(location = prunedFileIndex)(sparkSession)
         val prunedLogicalRelation = logicalRelation.copy(
           relation = prunedFsRelation,
           expectedOutputAttributes = Some(logicalRelation.output))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
index c14feea91e..b26edeeb04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
@@ -146,7 +146,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
    */
   def allFiles(): Array[T] = {
     var latestId = getLatest().map(_._1).getOrElse(-1L)
-    // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileCatalog`
+    // There is a race condition when `FileStreamSink` is deleting old files and `StreamFileIndex`
     // is calling this method. This loop will retry the reading to deal with the
     // race condition.
     while (true) {
@@ -158,7 +158,7 @@ abstract class CompactibleFileStreamLog[T: ClassTag](
         } catch {
           case e: IOException =>
             // Another process using `CompactibleFileStreamLog` may delete the batch files when
-            // `StreamFileCatalog` are reading. However, it only happens when a compaction is
+            // `StreamFileIndex` are reading. However, it only happens when a compaction is
             // deleting old files. If so, let's try the next compaction batch and we should find it.
             // Otherwise, this is a real IO issue and we should throw it.
             latestId = nextCompactionBatchId(latestId, compactInterval)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index a392b82999..680df01acc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -156,7 +156,7 @@ class FileStreamSource(
   private def fetchAllFiles(): Seq[(String, Long)] = {
     val startTime = System.nanoTime
     val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
-    val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
+    val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType))
     val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
       (status.getPath.toUri.toString, status.getModificationTime)
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
index 82b67cb1ca..aeaa134736 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.datasources._
 
 
 /**
- * A [[FileCatalog]] that generates the list of files to processing by reading them from the
+ * A [[FileIndex]] that generates the list of files to processing by reading them from the
  * metadata log files generated by the [[FileStreamSink]].
  */
-class MetadataLogFileCatalog(sparkSession: SparkSession, path: Path)
-  extends PartitioningAwareFileCatalog(sparkSession, Map.empty, None) {
+class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
+  extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
 
   private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
   logInfo(s"Reading streaming file log from $metadataDirectory")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
index 9c43169cbf..56df1face6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
@@ -28,15 +28,15 @@ import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.test.SharedSQLContext
 
-class FileCatalogSuite extends SharedSQLContext {
+class FileIndexSuite extends SharedSQLContext {
 
-  test("ListingFileCatalog: leaf files are qualified paths") {
+  test("InMemoryFileIndex: leaf files are qualified paths") {
     withTempDir { dir =>
       val file = new File(dir, "text.txt")
       stringToFile(file, "text")
 
       val path = new Path(file.getCanonicalPath)
-      val catalog = new ListingFileCatalog(spark, Seq(path), Map.empty, None) {
+      val catalog = new InMemoryFileIndex(spark, Seq(path), Map.empty, None) {
         def leafFilePaths: Seq[Path] = leafFiles.keys.toSeq
         def leafDirPaths: Seq[Path] = leafDirToChildrenFiles.keys.toSeq
       }
@@ -45,7 +45,7 @@ class FileCatalogSuite extends SharedSQLContext {
     }
   }
 
-  test("ListingFileCatalog: input paths are converted to qualified paths") {
+  test("InMemoryFileIndex: input paths are converted to qualified paths") {
     withTempDir { dir =>
       val file = new File(dir, "text.txt")
       stringToFile(file, "text")
@@ -59,42 +59,42 @@ class FileCatalogSuite extends SharedSQLContext {
       val qualifiedFilePath = fs.makeQualified(new Path(file.getCanonicalPath))
       require(qualifiedFilePath.toString.startsWith("file:"))
 
-      val catalog1 = new ListingFileCatalog(
+      val catalog1 = new InMemoryFileIndex(
         spark, Seq(unqualifiedDirPath), Map.empty, None)
       assert(catalog1.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
 
-      val catalog2 = new ListingFileCatalog(
+      val catalog2 = new InMemoryFileIndex(
         spark, Seq(unqualifiedFilePath), Map.empty, None)
       assert(catalog2.allFiles.map(_.getPath) === Seq(qualifiedFilePath))
 
     }
   }
 
-  test("ListingFileCatalog: folders that don't exist don't throw exceptions") {
+  test("InMemoryFileIndex: folders that don't exist don't throw exceptions") {
     withTempDir { dir =>
       val deletedFolder = new File(dir, "deleted")
       assert(!deletedFolder.exists())
-      val catalog1 = new ListingFileCatalog(
+      val catalog1 = new InMemoryFileIndex(
         spark, Seq(new Path(deletedFolder.getCanonicalPath)), Map.empty, None)
       // doesn't throw an exception
       assert(catalog1.listLeafFiles(catalog1.rootPaths).isEmpty)
     }
   }
 
-  test("PartitioningAwareFileCatalog - file filtering") {
-    assert(!PartitioningAwareFileCatalog.shouldFilterOut("abcd"))
-    assert(PartitioningAwareFileCatalog.shouldFilterOut(".ab"))
-    assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd"))
-    assert(!PartitioningAwareFileCatalog.shouldFilterOut("_metadata"))
-    assert(!PartitioningAwareFileCatalog.shouldFilterOut("_common_metadata"))
-    assert(PartitioningAwareFileCatalog.shouldFilterOut("_ab_metadata"))
-    assert(PartitioningAwareFileCatalog.shouldFilterOut("_cd_common_metadata"))
+  test("PartitioningAwareFileIndex - file filtering") {
+    assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("_cd"))
+    assert(!PartitioningAwareFileIndex.shouldFilterOut("_metadata"))
+    assert(!PartitioningAwareFileIndex.shouldFilterOut("_common_metadata"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("_ab_metadata"))
+    assert(PartitioningAwareFileIndex.shouldFilterOut("_cd_common_metadata"))
   }
 
-  test("SPARK-17613 - PartitioningAwareFileCatalog: base path w/o '/' at end") {
+  test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
     class MockCatalog(
       override val rootPaths: Seq[Path])
-      extends PartitioningAwareFileCatalog(spark, Map.empty, None) {
+      extends PartitioningAwareFileIndex(spark, Map.empty, None) {
 
       override def refresh(): Unit = {}
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index c32254d9df..d900ce7bb2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -393,7 +393,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
           util.stringToFile(file, fileName)
         }
 
-        val fileCatalog = new ListingFileCatalog(
+        val fileCatalog = new InMemoryFileIndex(
           sparkSession = spark,
           rootPaths = Seq(new Path(tempDir)),
           parameters = Map.empty[String, String],
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index f2a209e919..120a3a2ef3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -634,7 +634,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       val queryExecution = spark.read.parquet(dir.getCanonicalPath).queryExecution
       queryExecution.analyzed.collectFirst {
         case LogicalRelation(
-            HadoopFsRelation(location: PartitioningAwareFileCatalog, _, _, _, _, _), _, _) =>
+            HadoopFsRelation(location: PartitioningAwareFileIndex, _, _, _, _, _), _, _) =>
           assert(location.partitionSpec() === PartitionSpec.emptySpec)
       }.getOrElse {
         fail(s"Expecting a matching HadoopFsRelation, but got:\n$queryExecution")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 19c89f5c41..18b42a81a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileCatalog}
+import org.apache.spark.sql.execution.streaming.{FileStreamSinkWriter, MemoryStream, MetadataLogFileIndex}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
@@ -179,14 +179,14 @@ class FileStreamSinkSuite extends StreamTest {
         .add(StructField("id", IntegerType))
       assert(outputDf.schema === expectedSchema)
 
-      // Verify that MetadataLogFileCatalog is being used and the correct partitioning schema has
+      // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
       // been inferred
       val hadoopdFsRelations = outputDf.queryExecution.analyzed.collect {
         case LogicalRelation(baseRelation, _, _) if baseRelation.isInstanceOf[HadoopFsRelation] =>
           baseRelation.asInstanceOf[HadoopFsRelation]
       }
       assert(hadoopdFsRelations.size === 1)
-      assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileCatalog])
+      assert(hadoopdFsRelations.head.location.isInstanceOf[MetadataLogFileIndex])
       assert(hadoopdFsRelations.head.partitionSchema.exists(_.name == "id"))
       assert(hadoopdFsRelations.head.dataSchema.exists(_.name == "value"))
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index b9e9da9a1e..47018b3a3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -879,7 +879,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     val numFiles = 10000
 
     // This is to avoid running a spark job to list of files in parallel
-    // by the ListingFileCatalog.
+    // by the InMemoryFileIndex.
     spark.sessionState.conf.setConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD, numFiles * 2)
 
     withTempDirs { case (root, tmp) =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d1de863ce3..624ab747e4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -200,7 +200,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       val rootPaths: Seq[Path] = if (lazyPruningEnabled) {
         Seq(metastoreRelation.hiveQlTable.getDataLocation)
       } else {
-        // By convention (for example, see TableFileCatalog), the definition of a
+        // By convention (for example, see CatalogFileIndex), the definition of a
         // partitioned table's paths depends on whether that table has any actual partitions.
         // Partitioned tables without partitions use the location of the table's base path.
         // Partitioned tables with partitions use the locations of those partitions' data
@@ -227,7 +227,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       val logicalRelation = cached.getOrElse {
         val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
         val fileCatalog = {
-          val catalog = new TableFileCatalog(
+          val catalog = new CatalogFileIndex(
             sparkSession, metastoreRelation.catalogTable, sizeInBytes)
           if (lazyPruningEnabled) {
             catalog
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index ecdf4f14b3..fc35304c80 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -321,17 +321,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
     sql("DROP TABLE cachedTable")
   }
 
-  test("cache a table using TableFileCatalog") {
+  test("cache a table using CatalogFileIndex") {
     withTable("test") {
       sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
       val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
-      val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+      val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
 
       val dataSchema = StructType(tableMeta.schema.filterNot { f =>
         tableMeta.partitionColumnNames.contains(f.name)
       })
       val relation = HadoopFsRelation(
-        location = tableFileCatalog,
+        location = catalogFileIndex,
         partitionSchema = tableMeta.partitionSchema,
         dataSchema = dataSchema,
         bucketSpec = None,
@@ -343,7 +343,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
 
       assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
 
-      val sameCatalog = new TableFileCatalog(spark, tableMeta, 0)
+      val sameCatalog = new CatalogFileIndex(spark, tableMeta, 0)
       val sameRelation = HadoopFsRelation(
         location = sameCatalog,
         partitionSchema = tableMeta.partitionSchema,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 476383a5b3..d8e31c4e39 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -256,7 +256,7 @@ class PartitionedTablePerfStatsSuite
           // of doing plan cache validation based on the entire partition set.
           HiveCatalogMetrics.reset()
           assert(spark.sql("select * from test where partCol1 = 999").count() == 0)
-          // 5 from table resolution, another 5 from ListingFileCatalog
+          // 5 from table resolution, another 5 from InMemoryFileIndex
           assert(HiveCatalogMetrics.METRIC_PARTITIONS_FETCHED.getCount() == 10)
           assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 5)
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 59639aacf3..cdbc26cd5c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -45,13 +45,13 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
             |LOCATION '${dir.getAbsolutePath}'""".stripMargin)
 
         val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
-        val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+        val catalogFileIndex = new CatalogFileIndex(spark, tableMeta, 0)
 
         val dataSchema = StructType(tableMeta.schema.filterNot { f =>
           tableMeta.partitionColumnNames.contains(f.name)
         })
         val relation = HadoopFsRelation(
-          location = tableFileCatalog,
+          location = catalogFileIndex,
           partitionSchema = tableMeta.partitionSchema,
           dataSchema = dataSchema,
           bucketSpec = None,
-- 
GitLab