Skip to content
Snippets Groups Projects
Commit a9339db9 authored by Sean Owen's avatar Sean Owen Committed by Wenchen Fan
Browse files

[SPARK-21137][CORE] Spark reads many small files slowly

## What changes were proposed in this pull request?

Parallelize FileInputFormat.listStatus in Hadoop API via LIST_STATUS_NUM_THREADS to speed up examination of file sizes for wholeTextFiles et al

## How was this patch tested?

Existing tests, which will exercise the key path here: using a local file system.

Author: Sean Owen <sowen@cloudera.com>

Closes #18441 from srowen/SPARK-21137.
parent d913db16
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.task.JobContextImpl
import org.apache.spark.{Partition, SparkContext}
......@@ -35,8 +36,12 @@ private[spark] class BinaryFileRDD[T](
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
val conf = getConf
// setMinPartitions below will call FileInputFormat.listStatus(), which can be quite slow when
// traversing a large number of directories and files. Parallelize it.
conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
Runtime.getRuntime.availableProcessors().toString)
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
......
......@@ -20,6 +20,7 @@ package org.apache.spark.rdd
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.{Text, Writable}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.task.JobContextImpl
import org.apache.spark.{Partition, SparkContext}
......@@ -38,8 +39,12 @@ private[spark] class WholeTextFileRDD(
extends NewHadoopRDD[Text, Text](sc, inputFormatClass, keyClass, valueClass, conf) {
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
val conf = getConf
// setMinPartitions below will call FileInputFormat.listStatus(), which can be quite slow when
// traversing a large number of directories and files. Parallelize it.
conf.setIfUnset(FileInputFormat.LIST_STATUS_NUM_THREADS,
Runtime.getRuntime.availableProcessors().toString)
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(conf)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment