Skip to content
Snippets Groups Projects
Commit 65e896a6 authored by Eric Liang's avatar Eric Liang Committed by Wenchen Fan
Browse files

[SPARK-18679][SQL] Fix regression in file listing performance for non-catalog tables


## What changes were proposed in this pull request?

In Spark 2.1 ListingFileCatalog was significantly refactored (and renamed to InMemoryFileIndex). This introduced a regression where parallelism could only be introduced at the very top of the tree. However, in many cases (e.g. `spark.read.parquet(topLevelDir)`), the top of the tree is only a single directory.

This PR simplifies and fixes the parallel recursive listing code to allow parallelism to be introduced at any level during recursive descent (though note that once we decide to list a sub-tree in parallel, the sub-tree is listed in serial on executors).

cc mallman  cloud-fan

## How was this patch tested?

Checked metrics in unit tests.

Author: Eric Liang <ekl@databricks.com>

Closes #16112 from ericl/spark-18679.

(cherry picked from commit 294163ee)
Signed-off-by: default avatarWenchen Fan <wenchen@databricks.com>
parent a7f8ebb8
No related branches found
No related tags found
No related merge requests found
......@@ -90,6 +90,12 @@ object HiveCatalogMetrics extends Source {
*/
val METRIC_HIVE_CLIENT_CALLS = metricRegistry.counter(MetricRegistry.name("hiveClientCalls"))
/**
* Tracks the total number of Spark jobs launched for parallel file listing.
*/
val METRIC_PARALLEL_LISTING_JOB_COUNT = metricRegistry.counter(
MetricRegistry.name("parallelListingJobCount"))
/**
* Resets the values of all metrics to zero. This is useful in tests.
*/
......@@ -98,6 +104,7 @@ object HiveCatalogMetrics extends Source {
METRIC_FILES_DISCOVERED.dec(METRIC_FILES_DISCOVERED.getCount())
METRIC_FILE_CACHE_HITS.dec(METRIC_FILE_CACHE_HITS.getCount())
METRIC_HIVE_CLIENT_CALLS.dec(METRIC_HIVE_CLIENT_CALLS.getCount())
METRIC_PARALLEL_LISTING_JOB_COUNT.dec(METRIC_PARALLEL_LISTING_JOB_COUNT.getCount())
}
// clients can use these to avoid classloader issues with the codahale classes
......@@ -105,4 +112,5 @@ object HiveCatalogMetrics extends Source {
def incrementFilesDiscovered(n: Int): Unit = METRIC_FILES_DISCOVERED.inc(n)
def incrementFileCacheHits(n: Int): Unit = METRIC_FILE_CACHE_HITS.inc(n)
def incrementHiveClientCalls(n: Int): Unit = METRIC_HIVE_CLIENT_CALLS.inc(n)
def incrementParallelListingJobCount(n: Int): Unit = METRIC_PARALLEL_LISTING_JOB_COUNT.inc(n)
}
......@@ -249,12 +249,9 @@ abstract class PartitioningAwareFileIndex(
pathsToFetch += path
}
}
val discovered = if (pathsToFetch.length >=
sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
PartitioningAwareFileIndex.listLeafFilesInParallel(pathsToFetch, hadoopConf, sparkSession)
} else {
PartitioningAwareFileIndex.listLeafFilesInSerial(pathsToFetch, hadoopConf)
}
val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass))
val discovered = PartitioningAwareFileIndex.bulkListLeafFiles(
pathsToFetch, hadoopConf, filter, sparkSession)
discovered.foreach { case (path, leafFiles) =>
HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size)
fileStatusCache.putLeafFiles(path, leafFiles.toArray)
......@@ -286,31 +283,28 @@ object PartitioningAwareFileIndex extends Logging {
blockLocations: Array[SerializableBlockLocation])
/**
* List a collection of path recursively.
*/
private def listLeafFilesInSerial(
paths: Seq[Path],
hadoopConf: Configuration): Seq[(Path, Seq[FileStatus])] = {
// Dummy jobconf to get to the pathFilter defined in configuration
val jobConf = new JobConf(hadoopConf, this.getClass)
val filter = FileInputFormat.getInputPathFilter(jobConf)
paths.map { path =>
val fs = path.getFileSystem(hadoopConf)
(path, listLeafFiles0(fs, path, filter))
}
}
/**
* List a collection of path recursively in parallel (using Spark executors).
* Each task launched will use [[listLeafFilesInSerial]] to list.
* Lists a collection of paths recursively. Picks the listing strategy adaptively depending
* on the number of paths to list.
*
* This may only be called on the driver.
*
* @return for each input path, the set of discovered files for the path
*/
private def listLeafFilesInParallel(
private def bulkListLeafFiles(
paths: Seq[Path],
hadoopConf: Configuration,
filter: PathFilter,
sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = {
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size < sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
return paths.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
}
}
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
HiveCatalogMetrics.incrementParallelListingJobCount(1)
val sparkContext = sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
......@@ -322,9 +316,11 @@ object PartitioningAwareFileIndex extends Logging {
val statusMap = sparkContext
.parallelize(serializedPaths, numParallelism)
.mapPartitions { paths =>
.mapPartitions { pathStrings =>
val hadoopConf = serializableConfiguration.value
listLeafFilesInSerial(paths.map(new Path(_)).toSeq, hadoopConf).iterator
pathStrings.map(new Path(_)).toSeq.map { path =>
(path, listLeafFiles(path, hadoopConf, filter, None))
}.iterator
}.map { case (path, statuses) =>
val serializableStatuses = statuses.map { status =>
// Turn FileStatus into SerializableFileStatus so we can send it back to the driver
......@@ -372,11 +368,20 @@ object PartitioningAwareFileIndex extends Logging {
}
/**
* List a single path, provided as a FileStatus, in serial.
* Lists a single filesystem path recursively. If a SparkSession object is specified, this
* function may launch Spark jobs to parallelize listing.
*
* If sessionOpt is None, this may be called on executors.
*
* @return all children of path that match the specified filter.
*/
private def listLeafFiles0(
fs: FileSystem, path: Path, filter: PathFilter): Seq[FileStatus] = {
private def listLeafFiles(
path: Path,
hadoopConf: Configuration,
filter: PathFilter,
sessionOpt: Option[SparkSession]): Seq[FileStatus] = {
logTrace(s"Listing $path")
val fs = path.getFileSystem(hadoopConf)
val name = path.getName.toLowerCase
if (shouldFilterOut(name)) {
Seq.empty[FileStatus]
......@@ -391,9 +396,15 @@ object PartitioningAwareFileIndex extends Logging {
}
val allLeafStatuses = {
val (dirs, files) = statuses.partition(_.isDirectory)
val stats = files ++ dirs.flatMap(dir => listLeafFiles0(fs, dir.getPath, filter))
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = sessionOpt match {
case Some(session) =>
bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, session).flatMap(_._2)
case _ =>
dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, sessionOpt))
}
val allFiles = topLevelFiles ++ nestedFiles
if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else allFiles
}
allLeafStatuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
......
......@@ -25,6 +25,7 @@ import scala.language.reflectiveCalls
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.test.SharedSQLContext
......@@ -81,6 +82,58 @@ class FileIndexSuite extends SharedSQLContext {
}
}
test("PartitioningAwareFileIndex listing parallelized with many top level dirs") {
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
withTempDir { dir =>
val topLevelDirs = (1 to scale).map { i =>
val tmp = new File(dir, s"foo=$i.txt")
tmp.mkdir()
new Path(tmp.getCanonicalPath)
}
HiveCatalogMetrics.reset()
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
new InMemoryFileIndex(spark, topLevelDirs, Map.empty, None)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
}
}
}
test("PartitioningAwareFileIndex listing parallelized with large child dirs") {
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 1))) {
withTempDir { dir =>
for (i <- 1 to scale) {
new File(dir, s"foo=$i.txt").mkdir()
}
HiveCatalogMetrics.reset()
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
}
}
}
test("PartitioningAwareFileIndex listing parallelized with large, deeply nested child dirs") {
for ((scale, expectedNumPar) <- Seq((10, 0), (50, 4))) {
withTempDir { dir =>
for (i <- 1 to 2) {
val subdirA = new File(dir, s"a=$i")
subdirA.mkdir()
for (j <- 1 to 2) {
val subdirB = new File(subdirA, s"b=$j")
subdirB.mkdir()
for (k <- 1 to scale) {
new File(subdirB, s"foo=$k.txt").mkdir()
}
}
}
HiveCatalogMetrics.reset()
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
new InMemoryFileIndex(spark, Seq(new Path(dir.getCanonicalPath)), Map.empty, None)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == expectedNumPar)
}
}
}
test("PartitioningAwareFileIndex - file filtering") {
assert(!PartitioningAwareFileIndex.shouldFilterOut("abcd"))
assert(PartitioningAwareFileIndex.shouldFilterOut(".ab"))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment