Skip to content
Snippets Groups Projects
Commit 61e48f52 authored by Liang-Chi Hsieh's avatar Liang-Chi Hsieh Committed by Wenchen Fan
Browse files

[SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet

## What changes were proposed in this pull request?

We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet:

1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html
2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too.

This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc.

Two main changes in this patch:

1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner

    We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`.

2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator.

One thing to notice is:

We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`.  One concern is that it might also shadow other runtime exceptions other than reading corrupt files.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16474 from viirya/fix-ignorecorrupted-parquet-files.
parent de62ddf7
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,7 @@ package org.apache.spark.rdd
import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.reflect.ClassTag
......
......@@ -135,7 +135,17 @@ class FileScanRDD(
try {
if (ignoreCorruptFiles) {
currentIterator = new NextIterator[Object] {
private val internalIter = readFunction(currentFile)
private val internalIter = {
try {
// The readFunction may read files before consuming the iterator.
// E.g., vectorized Parquet reader.
readFunction(currentFile)
} catch {
case e @(_: RuntimeException | _: IOException) =>
logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
Iterator.empty
}
}
override def getNext(): AnyRef = {
try {
......
......@@ -17,10 +17,13 @@
package org.apache.spark.sql.execution.datasources.parquet
import java.io.IOException
import java.net.URI
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
import scala.util.{Failure, Try}
import org.apache.hadoop.conf.Configuration
......@@ -30,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.filter2.compat.FilterCompat
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
import org.apache.parquet.hadoop._
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
......@@ -151,7 +155,7 @@ class ParquetFileFormat
}
}
def inferSchema(
override def inferSchema(
sparkSession: SparkSession,
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
......@@ -542,6 +546,36 @@ object ParquetFileFormat extends Logging {
StructType(parquetSchema ++ missingFields)
}
/**
* Reads Parquet footers in multi-threaded manner.
* If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
* files when reading footers.
*/
private[parquet] def readParquetFootersInParallel(
conf: Configuration,
partFiles: Seq[FileStatus],
ignoreCorruptFiles: Boolean): Seq[Footer] = {
val parFiles = partFiles.par
parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
parFiles.flatMap { currentFile =>
try {
// Skips row group information since we only need the schema.
// ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
// when it can't read the footer.
Some(new Footer(currentFile.getPath(),
ParquetFileReader.readFooter(
conf, currentFile, SKIP_ROW_GROUPS)))
} catch { case e: RuntimeException =>
if (ignoreCorruptFiles) {
logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
None
} else {
throw new IOException(s"Could not read footer for file: $currentFile", e)
}
}
}.seq
}
/**
* Figures out a merged Parquet schema with a distributed Spark job.
*
......@@ -582,6 +616,8 @@ object ParquetFileFormat extends Logging {
val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
sparkSession.sparkContext.defaultParallelism)
val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
// Issues a Spark job to read Parquet schema in parallel.
val partiallyMergedSchemas =
sparkSession
......@@ -593,13 +629,10 @@ object ParquetFileFormat extends Logging {
new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
}.toSeq
// Skips row group information since we only need the schema
val skipRowGroups = true
// Reads footers in multi-threaded manner within each task
val footers =
ParquetFileReader.readAllFootersInParallel(
serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala
ParquetFileFormat.readParquetFootersInParallel(
serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
// Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
val converter =
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkException
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext {
test("read parquet footers in parallel") {
def testReadFooters(ignoreCorruptFiles: Boolean): Unit = {
withTempDir { dir =>
val fs = FileSystem.get(sparkContext.hadoopConfiguration)
val basePath = dir.getCanonicalPath
val path1 = new Path(basePath, "first")
val path2 = new Path(basePath, "second")
val path3 = new Path(basePath, "third")
spark.range(1).toDF("a").coalesce(1).write.parquet(path1.toString)
spark.range(1, 2).toDF("a").coalesce(1).write.parquet(path2.toString)
spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString)
val fileStatuses =
Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten
val footers = ParquetFileFormat.readParquetFootersInParallel(
sparkContext.hadoopConfiguration, fileStatuses, ignoreCorruptFiles)
assert(footers.size == 2)
}
}
testReadFooters(true)
val exception = intercept[java.io.IOException] {
testReadFooters(false)
}
assert(exception.getMessage().contains("Could not read footer for file"))
}
}
......@@ -22,6 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
......@@ -217,6 +218,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
test("Enabling/disabling ignoreCorruptFiles") {
def testIgnoreCorruptFiles(): Unit = {
withTempDir { dir =>
val basePath = dir.getCanonicalPath
spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
val df = spark.read.parquet(
new Path(basePath, "first").toString,
new Path(basePath, "second").toString,
new Path(basePath, "third").toString)
checkAnswer(
df,
Seq(Row(0), Row(1)))
}
}
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
testIgnoreCorruptFiles()
}
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val exception = intercept[SparkException] {
testIgnoreCorruptFiles()
}
assert(exception.getMessage().contains("is not a Parquet file"))
}
}
test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
withTempPath { dir =>
val basePath = dir.getCanonicalPath
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment