Skip to content
Snippets Groups Projects
Commit 6b68d61c authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Wenchen Fan
Browse files

[SPARK-20848][SQL][FOLLOW-UP] Shutdown the pool after reading parquet files

## What changes were proposed in this pull request?

This is a follow-up to #18073. Taking a safer approach to shutdown the pool to prevent possible issue. Also using `ThreadUtils.newForkJoinPool` instead to set a better thread name.

## How was this patch tested?

Manually test.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #18100 from viirya/SPARK-20848-followup.
parent 197f9018
No related branches found
No related tags found
No related merge requests found
......@@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
class ParquetFileFormat
extends FileFormat
......@@ -479,27 +479,29 @@ object ParquetFileFormat extends Logging {
partFiles: Seq[FileStatus],
ignoreCorruptFiles: Boolean): Seq[Footer] = {
val parFiles = partFiles.par
val pool = new ForkJoinPool(8)
val pool = ThreadUtils.newForkJoinPool("readingParquetFooters", 8)
parFiles.tasksupport = new ForkJoinTaskSupport(pool)
parFiles.flatMap { currentFile =>
try {
// Skips row group information since we only need the schema.
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
// when it can't read the footer.
Some(new Footer(currentFile.getPath(),
ParquetFileReader.readFooter(
conf, currentFile, SKIP_ROW_GROUPS)))
} catch { case e: RuntimeException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
None
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
try {
parFiles.flatMap { currentFile =>
try {
// Skips row group information since we only need the schema.
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
// when it can't read the footer.
Some(new Footer(currentFile.getPath(),
ParquetFileReader.readFooter(
conf, currentFile, SKIP_ROW_GROUPS)))
} catch { case e: RuntimeException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
None
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
}
}
} finally {
pool.shutdown()
}
}.seq
}.seq
} finally {
pool.shutdown()
}
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment