diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index dd3c96a792357a4200a6de3532b0737f027f5d83..7d2854aaad3940ec306da47bce1e2a4435638af4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -75,7 +75,7 @@ class ListingFileCatalog(
 
   protected def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
     if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
-      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession.sparkContext)
+      HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
     } else {
       // Dummy jobconf to get to the pathFilter defined in configuration
       val jobConf = new JobConf(hadoopConf, this.getClass)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 890e64db599454edb9a0bc75bf1c9ae40e2bba45..9c1898994c9a97c21d3f83677c8f3849e3775703 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -448,13 +448,22 @@ private[sql] object HadoopFsRelation extends Logging {
   def listLeafFilesInParallel(
       paths: Seq[Path],
       hadoopConf: Configuration,
-      sparkContext: SparkContext): mutable.LinkedHashSet[FileStatus] = {
+      sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
+    assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
     logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
 
+    val sparkContext = sparkSession.sparkContext
+    val sqlConf = sparkSession.sessionState.conf
     val serializableConfiguration = new SerializableConfiguration(hadoopConf)
     val serializedPaths = paths.map(_.toString)
 
-    val fakeStatuses = sparkContext.parallelize(serializedPaths).mapPartitions { paths =>
+    // Set the number of parallelism to prevent following file listing from generating many tasks
+    // in case of large #defaultParallelism.
+    val numParallelism = Math.min(paths.size, 10000)
+
+    val fakeStatuses = sparkContext
+        .parallelize(serializedPaths, numParallelism)
+        .mapPartitions { paths =>
       // Dummy jobconf to get to the pathFilter defined in configuration
       // It's very expensive to create a JobConf(ClassUtil.findContainingJar() is slow)
       val jobConf = new JobConf(serializableConfiguration.value, this.getClass)