diff --git a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
index b433cd0a89ac92f5cd68c5470fe88b8dae0c417d..99ec78633ab75373cbae1ea90892027adfc1341a 100644
--- a/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/source/StaticSources.scala
@@ -90,6 +90,12 @@ object HiveCatalogMetrics extends Source {
    */
   val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))
 
+  /**
+   * Tracks the total number of Spark jobs launched for parallel file listing.
+   */
+  val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter(
+    MetricRegistry.name("parallelListingJobCount"))
+
   /**
    * Resets the values of all metrics to zero. This is useful in tests.
    */
@@ -98,6 +104,7 @@ object HiveCatalogMetrics extends Source {
     METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
     METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
     METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
+    METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
   }
 
   // clients can use these to avoid classloader issues with the codahale classes
@@ -105,4 +112,5 @@ object HiveCatalogMetrics extends Source {
   def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
   def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
   def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
+  def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 3740caa22c37e041daccd203b1765b76d6ae6cd7..f22b55bb0465ee0480867c5791c4a86e9a5d0f8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -249,12 +249,9 @@ abstract class PartitioningAwareFileIndex(
           pathsToFetch += path
       }
     }
-    val discovered = if (pathsToFetch.length >=
-        sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
-    } else {
-      PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
-    }
+    val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
+    val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
+      pathsToFetch, hadoopConf, filter, sparkSession)
     discovered.foreach { case (path, leafFiles) =>
       HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
       fileStatusCache.putLeafFiles(path, leafFiles.toArray)
@@ -286,31 +283,28 @@ object PartitioningAwareFileIndex extends Logging {
       blockLocations: Array[SerializableBlockLocation])
 
   /**
-   * List a collection of path recursively.
-   */
-  private def listLeafFilesInSerial(
-      paths: Seq[Path],
-      hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
-    // Dummy jobconf to get to the pathFilter defined in configuration
-    val jobConf = new JobConf(hadoopConf, this.getClass)
-    val filter = FileInputFormat.getInputPathFilter(jobConf)
-
-    paths.map { path =>
-      val fs = path.getFileSystem(hadoopConf)
-      (path, listLeafFiles0(fs, path, filter))
-    }
-  }
-
-  /**
-   * List a collection of path recursively in parallel (using Spark executors).
-   * Each task launched will use [[listLeafFilesInSerial]] to list.
+   * Lists a collection of paths recursively. Picks the listing strategy adaptively depending
+   * on the number of paths to list.
+   *
+   * This may only be called on the driver.
+   *
+   * @return for each input path, the set of discovered files for the path
    */
-  private def listLeafFilesInParallel(
+  private def bulkListLeafFiles(
       paths: Seq[Path],
       hadoopConf: Configuration,
+      filter: PathFilter,
       sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
-    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
+
+    // Short-circuits parallel listing when serial listing is likely to be faster.
+    if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
+      return paths.map { path =>
+        (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
+      }
+    }
+
     logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
+    HiveCatalogMetrics.incrementParallelListingJobCount(1)
 
     val sparkContext = sparkSession.sparkContext
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
@@ -322,9 +316,11 @@ object PartitioningAwareFileIndex extends Logging {
 
     val statusMap = sparkContext
       .parallelize(serializedPaths, numParallelism)
-      .mapPartitions { paths =>
+      .mapPartitions { pathStrings =>
         val hadoopConf = serializableConfiguration.value
-        listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
+        pathStrings.map(new Path(_)).toSeq.map { path =>
+          (path, listLeafFiles(path, hadoopConf, filter, None))
+        }.iterator
       }.map { case (path, statuses) =>
         val serializableStatuses = statuses.map { status =>
           // Turn FileStatus into SerializableFileStatus so we can send it back to the driver
@@ -372,11 +368,20 @@ object PartitioningAwareFileIndex extends Logging {
   }
 
   /**
-   * List a single path, provided as a FileStatus, in serial.
+   * Lists a single filesystem path recursively. If a SparkSession object is specified, this
+   * function may launch Spark jobs to parallelize listing.
+   *
+   * If sessionOpt is None, this may be called on executors.
+   *
+   * @return all children of path that match the specified filter.
    */
-  private def listLeafFiles0(
-      fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
+  private def listLeafFiles(
+      path: Path,
+      hadoopConf: Configuration,
+      filter: PathFilter,
+      sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
     logTrace(s"Listing $path")
+    val fs = path.getFileSystem(hadoopConf)
     val name = path.getName.toLowerCase
     if (shouldFilterOut(name)) {
       Seq.empty[FileStatus]
@@ -391,9 +396,15 @@ object PartitioningAwareFileIndex extends Logging {
       }
 
       val allLeafStatuses = {
-        val (dirs, files) = statuses.partition(_.isDirectory)
-        val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
-        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
+        val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
+        val nestedFiles: Seq[FileStatus] = sessionOpt match {
+          case Some(session) =>
+            bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
+          case _ =>
+            dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
+        }
+        val allFiles = topLevelFiles ++ nestedFiles
+        if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
       }
 
       allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
similarity index 70%
rename from sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
rename to sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 56df1face636485e09176e7916f0a779f50811c8..b7a472b7f0919338d7c495fa5000336e05711ec4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -25,6 +25,7 @@ import scala.language.reflectiveCalls
 
 import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
 
+import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -81,6 +82,58 @@ class FileIndexSuite extends SharedSQLContext {
     }
   }
 
+  test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
+    for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+      withTempDir { dir =>
+        val topLevelDirs = (1 to scale).map { i =>
+          val tmp = new File(dir, s"foo=$i.txt")
+          tmp.mkdir()
+          new Path(tmp.getCanonicalPath)
+        }
+        HiveCatalogMetrics.reset()
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None)
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+      }
+    }
+  }
+
+  test("PartitioningAwareFileIndex listing parallelized with large child dirs") {
+    for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
+      withTempDir { dir =>
+        for (i <- 1 to scale) {
+          new File(dir, s"foo=$i.txt").mkdir()
+        }
+        HiveCatalogMetrics.reset()
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+      }
+    }
+  }
+
+  test("PartitioningAwareFileIndex listing parallelized with large, deeply nested child dirs") {
+    for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
+      withTempDir { dir =>
+        for (i <- 1 to 2) {
+          val subdirA = new File(dir, s"a=$i")
+          subdirA.mkdir()
+          for (j <- 1 to 2) {
+            val subdirB = new File(subdirA, s"b=$j")
+            subdirB.mkdir()
+            for (k <- 1 to scale) {
+              new File(subdirB, s"foo=$k.txt").mkdir()
+            }
+          }
+        }
+        HiveCatalogMetrics.reset()
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
+        new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
+        assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
+      }
+    }
+  }
+
   test("PartitioningAwareFileIndex - file filtering") {
     assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
     assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))