From 4f3ce062ce2e9b403f9d38a44eb7fc76a800ed67 Mon Sep 17 00:00:00 2001
From: Liang-Chi Hsieh <viirya@gmail.com>
Date: Mon, 16 Jan 2017 15:26:41 +0800
Subject: [PATCH] [SPARK-19082][SQL] Make ignoreCorruptFiles work for Parquet

## What changes were proposed in this pull request?

We have a config `spark.sql.files.ignoreCorruptFiles` which can be used to ignore corrupt files when reading files in SQL. Currently the `ignoreCorruptFiles` config has two issues and can't work for Parquet:

1. We only ignore corrupt files in `FileScanRDD` . Actually, we begin to read those files as early as inferring data schema from the files. For corrupt files, we can't read the schema and fail the program. A related issue reported at http://apache-spark-developers-list.1001551.n3.nabble.com/Skip-Corrupted-Parquet-blocks-footer-tc20418.html
2. In `FileScanRDD`, we assume that we only begin to read the files when starting to consume the iterator. However, it is possibly the files are read before that. In this case, `ignoreCorruptFiles` config doesn't work too.

This patch targets Parquet datasource. If this direction is ok, we can address the same issue for other datasources like Orc.

Two main changes in this patch:

1. Replace `ParquetFileReader.readAllFootersInParallel` by implementing the logic to read footers in multi-threaded manner

    We can't ignore corrupt files if we use `ParquetFileReader.readAllFootersInParallel`. So this patch implements the logic to do the similar thing in `readParquetFootersInParallel`.

2. In `FileScanRDD`, we need to ignore corrupt file too when we call `readFunction` to return iterator.

One thing to notice is:

We read schema from Parquet file's footer. The method to read footer `ParquetFileReader.readFooter` throws `RuntimeException`, instead of `IOException`, if it can't successfully read the footer. Please check out https://github.com/apache/parquet-mr/blob/df9d8e415436292ae33e1ca0b8da256640de9710/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L470. So this patch catches `RuntimeException`.  One concern is that it might also shadow other runtime exceptions other than reading corrupt files.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16474 from viirya/fix-ignorecorrupted-parquet-files.

(cherry picked from commit 61e48f52d1d8c7431707bd3511b6fe9f0ae996c0)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
---
 .../scala/org/apache/spark/rdd/UnionRDD.scala |  2 +-
 .../execution/datasources/FileScanRDD.scala   | 12 +++-
 .../parquet/ParquetFileFormat.scala           | 45 ++++++++++++--
 .../parquet/ParquetFileFormatSuite.scala      | 59 +++++++++++++++++++
 .../parquet/ParquetQuerySuite.scala           | 30 ++++++++++
 5 files changed, 140 insertions(+), 8 deletions(-)
 create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala

diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index ad1fddbde7..60e383afad 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.rdd
 import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.parallel.{ForkJoinTaskSupport, ThreadPoolTaskSupport}
+import scala.collection.parallel.ForkJoinTaskSupport
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.reflect.ClassTag
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index b926b92074..843459221e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -134,7 +134,17 @@ class FileScanRDD(
           try {
             if (ignoreCorruptFiles) {
               currentIterator = new NextIterator[Object] {
-                private val internalIter = readFunction(currentFile)
+                private val internalIter = {
+                  try {
+                    // The readFunction may read files before consuming the iterator.
+                    // E.g., vectorized Parquet reader.
+                    readFunction(currentFile)
+                  } catch {
+                    case e @(_: RuntimeException | _: IOException) =>
+                      logWarning(s"Skipped the rest content in the corrupted file: $currentFile", e)
+                      Iterator.empty
+                  }
+                }
 
                 override def getNext(): AnyRef = {
                   try {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 0965ffebea..0e1fc7ae96 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.io.IOException
 import java.net.URI
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.util.{Failure, Try}
 
 import org.apache.hadoop.conf.Configuration
@@ -30,6 +33,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.parquet.filter2.compat.FilterCompat
 import org.apache.parquet.filter2.predicate.FilterApi
+import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.codec.CodecConfig
 import org.apache.parquet.hadoop.util.ContextUtil
@@ -151,7 +155,7 @@ class ParquetFileFormat
     }
   }
 
-  def inferSchema(
+  override def inferSchema(
       sparkSession: SparkSession,
       parameters: Map[String, String],
       files: Seq[FileStatus]): Option[StructType] = {
@@ -547,6 +551,36 @@ object ParquetFileFormat extends Logging {
     StructType(parquetSchema ++ missingFields)
   }
 
+  /**
+   * Reads Parquet footers in multi-threaded manner.
+   * If the config "spark.sql.files.ignoreCorruptFiles" is set to true, we will ignore the corrupted
+   * files when reading footers.
+   */
+  private[parquet] def readParquetFootersInParallel(
+      conf: Configuration,
+      partFiles: Seq[FileStatus],
+      ignoreCorruptFiles: Boolean): Seq[Footer] = {
+    val parFiles = partFiles.par
+    parFiles.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(8))
+    parFiles.flatMap { currentFile =>
+      try {
+        // Skips row group information since we only need the schema.
+        // ParquetFileReader.readFooter throws RuntimeException, instead of IOException,
+        // when it can't read the footer.
+        Some(new Footer(currentFile.getPath(),
+          ParquetFileReader.readFooter(
+            conf, currentFile, SKIP_ROW_GROUPS)))
+      } catch { case e: RuntimeException =>
+        if (ignoreCorruptFiles) {
+          logWarning(s"Skipped the footer in the corrupted file: $currentFile", e)
+          None
+        } else {
+          throw new IOException(s"Could not read footer for file: $currentFile", e)
+        }
+      }
+    }.seq
+  }
+
   /**
    * Figures out a merged Parquet schema with a distributed Spark job.
    *
@@ -587,6 +621,8 @@ object ParquetFileFormat extends Logging {
     val numParallelism = Math.min(Math.max(partialFileStatusInfo.size, 1),
       sparkSession.sparkContext.defaultParallelism)
 
+    val ignoreCorruptFiles = sparkSession.sessionState.conf.ignoreCorruptFiles
+
     // Issues a Spark job to read Parquet schema in parallel.
     val partiallyMergedSchemas =
       sparkSession
@@ -598,13 +634,10 @@ object ParquetFileFormat extends Logging {
             new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(path))
           }.toSeq
 
-          // Skips row group information since we only need the schema
-          val skipRowGroups = true
-
           // Reads footers in multi-threaded manner within each task
           val footers =
-            ParquetFileReader.readAllFootersInParallel(
-              serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala
+            ParquetFileFormat.readParquetFootersInParallel(
+              serializedConf.value, fakeFileStatuses, ignoreCorruptFiles)
 
           // Converter used to convert Parquet `MessageType` to Spark SQL `StructType`
           val converter =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
new file mode 100644
index 0000000000..ccb34355f1
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormatSuite.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSQLContext {
+
+  test("read parquet footers in parallel") {
+    def testReadFooters(ignoreCorruptFiles: Boolean): Unit = {
+      withTempDir { dir =>
+        val fs = FileSystem.get(sparkContext.hadoopConfiguration)
+        val basePath = dir.getCanonicalPath
+
+        val path1 = new Path(basePath, "first")
+        val path2 = new Path(basePath, "second")
+        val path3 = new Path(basePath, "third")
+
+        spark.range(1).toDF("a").coalesce(1).write.parquet(path1.toString)
+        spark.range(1, 2).toDF("a").coalesce(1).write.parquet(path2.toString)
+        spark.range(2, 3).toDF("a").coalesce(1).write.json(path3.toString)
+
+        val fileStatuses =
+          Seq(fs.listStatus(path1), fs.listStatus(path2), fs.listStatus(path3)).flatten
+
+        val footers = ParquetFileFormat.readParquetFootersInParallel(
+          sparkContext.hadoopConfiguration, fileStatuses, ignoreCorruptFiles)
+
+        assert(footers.size == 2)
+      }
+    }
+
+    testReadFooters(true)
+    val exception = intercept[java.io.IOException] {
+      testReadFooters(false)
+    }
+    assert(exception.getMessage().contains("Could not read footer for file"))
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 4c4a7d86f2..6132376724 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.parquet.hadoop.ParquetOutputFormat
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
@@ -212,6 +213,35 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
     }
   }
 
+  test("Enabling/disabling ignoreCorruptFiles") {
+    def testIgnoreCorruptFiles(): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        spark.range(1).toDF("a").write.parquet(new Path(basePath, "first").toString)
+        spark.range(1, 2).toDF("a").write.parquet(new Path(basePath, "second").toString)
+        spark.range(2, 3).toDF("a").write.json(new Path(basePath, "third").toString)
+        val df = spark.read.parquet(
+          new Path(basePath, "first").toString,
+          new Path(basePath, "second").toString,
+          new Path(basePath, "third").toString)
+        checkAnswer(
+          df,
+          Seq(Row(0), Row(1)))
+      }
+    }
+
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+      testIgnoreCorruptFiles()
+    }
+
+    withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+      val exception = intercept[SparkException] {
+        testIgnoreCorruptFiles()
+      }
+      assert(exception.getMessage().contains("is not a Parquet file"))
+    }
+  }
+
   test("SPARK-8990 DataFrameReader.parquet() should respect user specified options") {
     withTempPath { dir =>
       val basePath = dir.getCanonicalPath
-- 
GitLab