From 294163ee9319e4f7f6da1259839eb3c80bba25c2 Mon Sep 17 00:00:00 2001
From: Eric Liang <ekl@databricks.com>
Date: Fri, 2 Dec 2016 20:59:39 +0800
Subject: [PATCH] [SPARK-18679][SQL] Fix regression in file listing performance
 for non-catalog tables

## What changes were proposed in this pull request?

In Spark 2.1 ListingFileCatalog was significantly refactored (and renamed to InMemoryFileIndex). This introduced a regression where parallelism could only be introduced at the very top of the tree. However, in many cases (e.g. `spark.read.parquet(topLevelDir)`), the top of the tree is only a single directory.

This PR simplifies and fixes the parallel recursive listing code to allow parallelism to be introduced at any level during recursive descent (though note that once we decide to list a sub-tree in parallel, the sub-tree is listed in serial on executors).

cc mallman  cloud-fan

## How was this patch tested?

Checked metrics in unit tests.

Author: Eric Liang <ekl@databricks.com>

Closes #16112 from ericl/spark-18679.
---
 .../spark/metrics/source/StaticSources.scala  |  8 ++
 .../PartitioningAwareFileIndex.scala          | 79 +++++++++++--------
 ...atalogSuite.scala => FileIndexSuite.scala} | 53 +++++++++++++
 3 files changed, 106 insertions(+), 34 deletions(-)
 rename sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/{FileCatalogSuite.scala => FileIndexSuite.scala} (70%)

diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index b433cd0a89..99ec78633a 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -90,6 +90,12 @@ object HiveCatalogMetrics extends Source {
    */
   val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))
 
+  /**
+   * Tracks the total number of Spark jobs launched for parallel file listing.
+   */
+  val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter(
+    MetricRegistry.name("parallelListingJobCount"))
+
   /**
    * Resets the values of all metrics to zero. This is useful in tests.
    */
@@ -98,6 +104,7 @@ object HiveCatalogMetrics extends Source {
     METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
     METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
     METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
+    METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
   }
 
   // clients can use these to avoid classloader issues with the codahale classes
@@ -105,4 +112,5 @@ object HiveCatalogMetrics extends Source {
   def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
   def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
   def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
+  def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 705a1e3198..825a0f70dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -249,12 +249,9 @@ abstract class PartitioningAwareFileIndex(
           pathsToFetch += path
       }
     }
-    val discovered = if (pathsToFetch.length >=
-        sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
-    } else {
-      PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
-    }
+    val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
+    val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
+      pathsToFetch, hadoopConf, filter, sparkSession)
     discovered.foreach { case (path, leafFiles) =>
       HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
       fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -286,31 +283,28 @@ object PartitioningAwareFileIndex extends Logging {
       blockLocations: Array[SerializableBlockLocation])
 
   /**
-   * List a collection of path recursively.
-   */
-  private def listLeafFilesInSerial(
-      paths: Seq[Path],
-      hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
-    // Dummy jobconf to get to the pathFilter defined in configuration
-    val jobConf = new JobConf(hadoopConf, this.getClass)
-    val filter = FileInputFormat.getInputPathFilter(jobConf)
-
-    paths.map { path =>
-      val fs = path.getFileSystem(hadoopConf)
-      (path, listLeafFiles0(fs, path, filter))
-    }
-  }
-
-  /**
-   * List a collection of path recursively in parallel (using Spark executors).
-   * Each task launched will use [[listLeafFilesInSerial]] to list.
+   * Lists a collection of paths recursively. Picks the listing strategy adaptively depending
+   * on the number of paths to list.
+   *
+   * This may only be called on the driver.
+   *
+   * @return for each input path, the set of discovered files for the path
    */
-  private def listLeafFilesInParallel(
+  private def bulkListLeafFiles(
       paths: Seq[Path],
       hadoopConf: Configuration,
+      filter: PathFilter,
       sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
-    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+
+    // Short-circuits parallel listing when serial listing is likely to be faster.
+    if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+      return paths.map { path =>
+        (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
+      }
+    }
+
     logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+    HiveCatalogMetrics.incrementParallelListingJobCount(1)
 
     val sparkContext = sparkSession.sparkContext
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -324,9 +318,11 @@ object PartitioningAwareFileIndex extends Logging {
 
     val statusMap = sparkContext
       .parallelize(serializedPaths, numParallelism)
-      .mapPartitions { paths =>
+      .mapPartitions { pathStrings =>
         val hadoopConf = serializableConfiguration.value
-        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+        pathStrings.map(new Path(_)).toSeq.map { path =>
+          (path, listLeafFiles(path, hadoopConf, filter, None))
+        }.iterator
       }.map { case (path, statuses) =>
         val serializableStatuses = statuses.map { status =>
           // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
@@ -374,11 +370,20 @@ object PartitioningAwareFileIndex extends Logging {
   }
 
   /**
-   * List a single path, provided as a FileStatus, in serial.
+   * Lists a single filesystem path recursively. If a SparkSession object is specified, this
+   * function may launch Spark jobs to parallelize listing.
+   *
+   * If sessionOpt is None, this may be called on executors.
+   *
+   * @return all children of path that match the specified filter.
    */
-  private def listLeafFiles0(
-      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+  private def listLeafFiles(
+      path: Path,
+      hadoopConf: Configuration,
+      filter: PathFilter,
+      sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
     logTrace(s"Listing $path")
+    val fs = path.getFileSystem(hadoopConf)
     val name = path.getName.toLowerCase
     if (shouldFilterOut(name)) {
       Seq.empty[FileStatus]
@@ -393,9 +398,15 @@ object PartitioningAwareFileIndex extends Logging {
       }
 
       val allLeafStatuses = {
-        val (dirs, files) = statuses.partition(_.isDirectory)
-        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
-        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+        val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
+        val nestedFiles: Seq[FileStatus] = sessionOpt match {
+          case Some(session) =>
+            bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
+          case _ =>
+            dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
+        }
+        val allFiles = topLevelFiles ++ nestedFiles
+        if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
       }
 
       allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
similarity index 70%
rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 56df1face6..b7a472b7f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -25,6 +25,7 @@ import scala.language.reflectiveCalls
 
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 
+import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -81,6 +82,58 @@ class FileIndexSuite extends SharedSQLContext {
     }
   }
 
+  test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
+    for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+      withTempDir { dir =>
+        val topLevelDirs = (1 to scale).map { i =>
+          val tmp = new File(dir, s"foo=$i.txt")
+          tmp.mkdir()
+          new Path(tmp.getCanonicalPath)
+        }
+        HiveCatalogMetrics.reset()
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None)
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+      }
+    }
+  }
+
+  test("PartitioningAwareFileIndex listing parallelized with large child dirs") {
+    for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+      withTempDir { dir =>
+        for (i <- 1 to scale) {
+          new File(dir, s"foo=$i.txt").mkdir()
+        }
+        HiveCatalogMetrics.reset()
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+      }
+    }
+  }
+
+  test("PartitioningAwareFileIndex listing parallelized with large, deeply nested child dirs") {
+    for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
+      withTempDir { dir =>
+        for (i <- 1 to 2) {
+          val subdirA = new File(dir, s"a=$i")
+          subdirA.mkdir()
+          for (j <- 1 to 2) {
+            val subdirB = new File(subdirA, s"b=$j")
+            subdirB.mkdir()
+            for (k <- 1 to scale) {
+              new File(subdirB, s"foo=$k.txt").mkdir()
+            }
+          }
+        }
+        HiveCatalogMetrics.reset()
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+      }
+    }
+  }
+
   test("PartitioningAwareFileIndex - file filtering") {
     assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
     assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
-- 
GitLab