From 192e42a2933eb283e12bfdfb46e2ef895228af4a Mon Sep 17 00:00:00 2001
From: Yin Huai <yhuai@databricks.com>
Date: Fri, 27 Feb 2015 01:01:32 +0800
Subject: [PATCH] [SPARK-6016][SQL] Cannot read the parquet table after
 overwriting the existing table when spark.sql.parquet.cacheMetadata=true

Please see JIRA (https://issues.apache.org/jira/browse/SPARK-6016) for details of the bug.

Author: Yin Huai <yhuai@databricks.com>

Closes #4775 from yhuai/parquetFooterCache and squashes the following commits:

78787b1 [Yin Huai] Remove footerCache in FilteringParquetRowInputFormat.
dff6fba [Yin Huai] Failed unit test.
---
 .../sql/parquet/ParquetTableOperations.scala  | 49 +++----------------
 .../apache/spark/sql/parquet/newParquet.scala |  8 ++-
 .../spark/sql/parquet/parquetSuites.scala     | 27 ++++++++++
 3 files changed, 42 insertions(+), 42 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 4dc13b036c..9061d3f5fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -374,8 +374,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
 private[parquet] class FilteringParquetRowInputFormat
   extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
 
-  private var footers: JList[Footer] = _
-
   private var fileStatuses = Map.empty[Path, FileStatus]
 
   override def createRecordReader(
@@ -396,46 +394,15 @@ private[parquet] class FilteringParquetRowInputFormat
     }
   }
 
-  override def getFooters(jobContext: JobContext): JList[Footer] = {
-    import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache
-
-    if (footers eq null) {
-      val conf = ContextUtil.getConfiguration(jobContext)
-      val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
-      val statuses = listStatus(jobContext)
-      fileStatuses = statuses.map(file => file.getPath -> file).toMap
-      if (statuses.isEmpty) {
-        footers = Collections.emptyList[Footer]
-      } else if (!cacheMetadata) {
-        // Read the footers from HDFS
-        footers = getFooters(conf, statuses)
-      } else {
-        // Read only the footers that are not in the footerCache
-        val foundFooters = footerCache.getAllPresent(statuses)
-        val toFetch = new ArrayList[FileStatus]
-        for (s <- statuses) {
-          if (!foundFooters.containsKey(s)) {
-            toFetch.add(s)
-          }
-        }
-        val newFooters = new mutable.HashMap[FileStatus, Footer]
-        if (toFetch.size > 0) {
-          val startFetch = System.currentTimeMillis
-          val fetched = getFooters(conf, toFetch)
-          logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch} ms")
-          for ((status, i) <- toFetch.zipWithIndex) {
-            newFooters(status) = fetched.get(i)
-          }
-          footerCache.putAll(newFooters)
-        }
-        footers = new ArrayList[Footer](statuses.size)
-        for (status <- statuses) {
-          footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
-        }
-      }
-    }
+  // This is only a temporary solution sicne we need to use fileStatuses in
+  // both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
+  // two methods.
+  override def getSplits(jobContext: JobContext): JList[InputSplit] = {
+    // First set fileStatuses.
+    val statuses = listStatus(jobContext)
+    fileStatuses = statuses.map(file => file.getPath -> file).toMap
 
-    footers
+    super.getSplits(jobContext)
   }
 
   // TODO Remove this method and related code once PARQUET-16 is fixed
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 16b771344b..e648618468 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -200,7 +200,7 @@ private[sql] case class ParquetRelation2(
     private var commonMetadataStatuses: Array[FileStatus] = _
 
     // Parquet footer cache.
-    private var footers: Map[FileStatus, Footer] = _
+    var footers: Map[FileStatus, Footer] = _
 
     // `FileStatus` objects of all data files (Parquet part-files).
     var dataStatuses: Array[FileStatus] = _
@@ -400,6 +400,7 @@ private[sql] case class ParquetRelation2(
     } else {
       metadataCache.dataStatuses.toSeq
     }
+    val selectedFooters = selectedFiles.map(metadataCache.footers)
 
     // FileInputFormat cannot handle empty lists.
     if (selectedFiles.nonEmpty) {
@@ -447,11 +448,16 @@ private[sql] case class ParquetRelation2(
         @transient
         val cachedStatus = selectedFiles
 
+        @transient
+        val cachedFooters = selectedFooters
+
         // Overridden so we can inject our own cached files statuses.
         override def getPartitions: Array[SparkPartition] = {
           val inputFormat = if (cacheMetadata) {
             new FilteringParquetRowInputFormat {
               override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
+
+              override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
             }
           } else {
             new FilteringParquetRowInputFormat
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
index 80fd5cda20..6a9d9daf67 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/parquetSuites.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
+import org.apache.spark.sql.SaveMode
 
 // The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
@@ -409,6 +410,32 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
       )
     """)
   }
+
+  test("SPARK-6016 make sure to use the latest footers") {
+    sql("drop table if exists spark_6016_fix")
+
+    // Create a DataFrame with two partitions. So, the created table will have two parquet files.
+    val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
+    df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    checkAnswer(
+      sql("select * from spark_6016_fix"),
+      (1 to 10).map(i => Row(i))
+    )
+
+    // Create a DataFrame with four partitions. So, the created table will have four parquet files.
+    val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
+    df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
+    // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
+    // since the new table has four parquet files, we are trying to read new footers from two files
+    // and then merge metadata in footers of these four (two outdated ones and two latest one),
+    // which will cause an error.
+    checkAnswer(
+      sql("select * from spark_6016_fix"),
+      (1 to 10).map(i => Row(i))
+    )
+
+    sql("drop table spark_6016_fix")
+  }
 }
 
 class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
-- 
GitLab