From 431542765785304edb76a19885fbc5f9b8ae7d64 Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian@databricks.com>
Date: Thu, 2 Jun 2016 16:16:27 -0700
Subject: [PATCH] [SPARK-15719][SQL] Disables writing Parquet summary files by
 default

## What changes were proposed in this pull request?

This PR disables writing Parquet summary files by default (i.e., when Hadoop configuration "parquet.enable.summary-metadata" is not set).

Please refer to [SPARK-15719][1] for more details.

## How was this patch tested?

New test case added in `ParquetQuerySuite` to check no summary files are written by default.

[1]: https://issues.apache.org/jira/browse/SPARK-15719

Author: Cheng Lian <lian@databricks.com>

Closes #13455 from liancheng/spark-15719-disable-parquet-summary-files.
---
 .../parquet/ParquetFileFormat.scala           |  7 ++++-
 .../datasources/parquet/ParquetIOSuite.scala  | 20 ++++++------
 .../parquet/ParquetQuerySuite.scala           | 31 ++++++++++++++++---
 .../parquet/ParquetSchemaSuite.scala          | 19 ++----------
 .../ParquetHadoopFsRelationSuite.scala        | 29 +++++++++--------
 5 files changed, 62 insertions(+), 44 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index ff7962df22..ada9cd4b8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -124,6 +124,11 @@ private[sql] class ParquetFileFormat
     // Sets compression scheme
     conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec)
 
+    // SPARK-15719: Disables writing Parquet summary files by default.
+    if (conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) {
+      conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
+    }
+
     new OutputWriterFactory {
       override def newInstance(
           path: String,
@@ -786,7 +791,7 @@ private[sql] object ParquetFileFormat extends Logging {
     //
     // Parquet requires `FileStatus`es to read footers.  Here we try to send cached `FileStatus`es
     // to executor side to avoid fetching them again.  However, `FileStatus` is not `Serializable`
-    // but only `Writable`.  What makes it worth, for some reason, `FileStatus` doesn't play well
+    // but only `Writable`.  What makes it worse, for some reason, `FileStatus` doesn't play well
     // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`.  These
     // facts virtually prevents us to serialize `FileStatus`es.
     //
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d0107aae5a..92f2db325c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -515,17 +515,19 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
     val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions)
 
-    withTempPath { dir =>
-      val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
-      spark.range(1 << 16).selectExpr("(id % 4) AS i")
-        .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
+    withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
+        spark.range(1 << 16).selectExpr("(id % 4) AS i")
+          .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
 
-      val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head
-      val columnChunkMetadata = blockMetadata.getColumns.asScala.head
+        val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head
+        val columnChunkMetadata = blockMetadata.getColumns.asScala.head
 
-      // If the file is written with version2, this should include
-      // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
-      assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+        // If the file is written with version2, this should include
+        // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY
+        assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+      }
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 0a2fb0ef50..78b97f6995 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.File
 
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -148,13 +149,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       }
     }
 
-    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
-      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") {
+    withSQLConf(
+      SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true",
+      ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true"
+    ) {
       testSchemaMerging(2)
     }
 
-    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
-      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") {
+    withSQLConf(
+      SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false"
+    ) {
       testSchemaMerging(3)
     }
   }
@@ -604,6 +610,21 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
       }
     }
   }
+
+  test("SPARK-15719: disable writing summary files by default") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.range(3).write.parquet(path)
+
+      val fs = FileSystem.get(sparkContext.hadoopConfiguration)
+      val files = fs.listFiles(new Path(path), true)
+
+      while (files.hasNext) {
+        val file = files.next
+        assert(!file.getPath.getName.contains("_metadata"))
+      }
+    }
+  }
 }
 
 object TestingUDT {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 0b5038cb82..865bfa24c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -451,31 +451,18 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
   }
 
   test("schema merging failure error message") {
-    withTempPath { dir =>
-      val path = dir.getCanonicalPath
-      spark.range(3).write.parquet(s"$path/p=1")
-      spark.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
-
-      val message = intercept[SparkException] {
-        spark.read.option("mergeSchema", "true").parquet(path).schema
-      }.getMessage
+    import testImplicits._
 
-      assert(message.contains("Failed merging schema of file"))
-    }
-
-    // test for second merging (after read Parquet schema in parallel done)
     withTempPath { dir =>
       val path = dir.getCanonicalPath
       spark.range(3).write.parquet(s"$path/p=1")
-      spark.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")
-
-      spark.sparkContext.conf.set("spark.default.parallelism", "20")
+      spark.range(3).select('id cast IntegerType as 'id).write.parquet(s"$path/p=2")
 
       val message = intercept[SparkException] {
         spark.read.option("mergeSchema", "true").parquet(path).schema
       }.getMessage
 
-      assert(message.contains("Failed merging schema:"))
+      assert(message.contains("Failed merging schema"))
     }
   }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index f9a1d16d90..8aa018d0a9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
@@ -124,23 +125,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
   }
 
   test("SPARK-8604: Parquet data source should write summary file while doing appending") {
-    withTempPath { dir =>
-      val path = dir.getCanonicalPath
-      val df = spark.range(0, 5).toDF()
-      df.write.mode(SaveMode.Overwrite).parquet(path)
+    withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+      withTempPath { dir =>
+        val path = dir.getCanonicalPath
+        val df = spark.range(0, 5).toDF()
+        df.write.mode(SaveMode.Overwrite).parquet(path)
 
-      val summaryPath = new Path(path, "_metadata")
-      val commonSummaryPath = new Path(path, "_common_metadata")
+        val summaryPath = new Path(path, "_metadata")
+        val commonSummaryPath = new Path(path, "_common_metadata")
 
-      val fs = summaryPath.getFileSystem(spark.sessionState.newHadoopConf())
-      fs.delete(summaryPath, true)
-      fs.delete(commonSummaryPath, true)
+        val fs = summaryPath.getFileSystem(spark.sessionState.newHadoopConf())
+        fs.delete(summaryPath, true)
+        fs.delete(commonSummaryPath, true)
 
-      df.write.mode(SaveMode.Append).parquet(path)
-      checkAnswer(spark.read.parquet(path), df.union(df))
+        df.write.mode(SaveMode.Append).parquet(path)
+        checkAnswer(spark.read.parquet(path), df.union(df))
 
-      assert(fs.exists(summaryPath))
-      assert(fs.exists(commonSummaryPath))
+        assert(fs.exists(summaryPath))
+        assert(fs.exists(commonSummaryPath))
+      }
     }
   }
 
-- 
GitLab