diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index ad1fddbde7b00dbdad5939d81b36d2337a09e30a..60e383afadf1c80ac368422b542943971f8b0bf2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -20,7 +20,7 @@ package org.apache.spark.rdd import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer -import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport} +import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool import scala.reflect.ClassTag diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index b926b9207416fe5a2a4380ab17feba3b78996218..843459221e6898e72125873d6c4237a6b9f35ec9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -134,7 +134,17 @@ class FileScanRDD( try { if (ignoreCorruptFiles) { currentIterator = new NextIterator[Object] { - private val internalIter = readFunction(currentFile) + private val internalIter = { + try { + // The readFunction may read files before consuming the iterator. + // E.g., vectorized Parquet reader. + readFunction(currentFile) + } catch { + case e @(_: RuntimeException | _: IOException) => + logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e) + Iterator.empty + } + } override def getNext(): AnyRef = { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 0965ffebea9620c687398fb2abd6e5f98a9853fa..0e1fc7ae961357d33f4880d3f508a36b8783b085 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.io.IOException import java.net.URI import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.forkjoin.ForkJoinPool import scala.util.{Failure, Try} import org.apache.hadoop.conf.Configuration @@ -30,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.codec.CodecConfig import org.apache.parquet.hadoop.util.ContextUtil @@ -151,7 +155,7 @@ class ParquetFileFormat } } - def inferSchema( + override def inferSchema( sparkSession: SparkSession, parameters: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { @@ -547,6 +551,36 @@ object ParquetFileFormat extends Logging { StructType(parquetSchema ++ missingFields) } + /** + * Reads Parquet footers in multi-threaded manner. + * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted + * files when reading footers. + */ + private[parquet] def readParquetFootersInParallel( + conf: Configuration, + partFiles: Seq[FileStatus], + ignoreCorruptFiles: Boolean): Seq[Footer] = { + val parFiles = partFiles.par + parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) + parFiles.flatMap { currentFile => + try { + // Skips row group information since we only need the schema. + // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, + // when it can't read the footer. + Some(new Footer(currentFile.getPath(), + ParquetFileReader.readFooter( + conf, currentFile, SKIP_ROW_GROUPS))) + } catch { case e: RuntimeException => + if (ignoreCorruptFiles) { + logWarning(s"Skipped the footer in the corrupted file: $currentFile", e) + None + } else { + throw new IOException(s"Could not read footer for file: $currentFile", e) + } + } + }.seq + } + /** * Figures out a merged Parquet schema with a distributed Spark job. * @@ -587,6 +621,8 @@ object ParquetFileFormat extends Logging { val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1), sparkSession.sparkContext.defaultParallelism) + val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles + // Issues a Spark job to read Parquet schema in parallel. val partiallyMergedSchemas = sparkSession @@ -598,13 +634,10 @@ object ParquetFileFormat extends Logging { new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path)) }.toSeq - // Skips row group information since we only need the schema - val skipRowGroups = true - // Reads footers in multi-threaded manner within each task val footers = - ParquetFileReader.readAllFootersInParallel( - serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala + ParquetFileFormat.readParquetFootersInParallel( + serializedConf.value, fakeFileStatuses, ignoreCorruptFiles) // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala new file mode 100644 index 0000000000000000000000000000000000000000..ccb34355f1bac0a8b5592076fbaf71ef2a875bf2 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkException +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext { + + test("read parquet footers in parallel") { + def testReadFooters(ignoreCorruptFiles: Boolean): Unit = { + withTempDir { dir => + val fs = FileSystem.get(sparkContext.hadoopConfiguration) + val basePath = dir.getCanonicalPath + + val path1 = new Path(basePath, "first") + val path2 = new Path(basePath, "second") + val path3 = new Path(basePath, "third") + + spark.range(1).toDF("a").coalesce(1).write.parquet(path1.toString) + spark.range(1, 2).toDF("a").coalesce(1).write.parquet(path2.toString) + spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString) + + val fileStatuses = + Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten + + val footers = ParquetFileFormat.readParquetFootersInParallel( + sparkContext.hadoopConfiguration, fileStatuses, ignoreCorruptFiles) + + assert(footers.size == 2) + } + } + + testReadFooters(true) + val exception = intercept[java.io.IOException] { + testReadFooters(false) + } + assert(exception.getMessage().contains("Could not read footer for file")) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 4c4a7d86f2bd3e710b09815657754df7f864db37..613237672492ba623fb1ba994e079384fb0782ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.parquet.hadoop.ParquetOutputFormat +import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow @@ -212,6 +213,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } + test("Enabling/disabling ignoreCorruptFiles") { + def testIgnoreCorruptFiles(): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString) + spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString) + spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString) + val df = spark.read.parquet( + new Path(basePath, "first").toString, + new Path(basePath, "second").toString, + new Path(basePath, "third").toString) + checkAnswer( + df, + Seq(Row(0), Row(1))) + } + } + + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + testIgnoreCorruptFiles() + } + + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val exception = intercept[SparkException] { + testIgnoreCorruptFiles() + } + assert(exception.getMessage().contains("is not a Parquet file")) + } + } + test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") { withTempPath { dir => val basePath = dir.getCanonicalPath