From c337844ed7f9b2cb7b217dc935183ef5e1096ca1 Mon Sep 17 00:00:00 2001
From: Cheng Lian <lian@databricks.com>
Date: Thu, 25 Jun 2015 00:06:23 -0700
Subject: [PATCH] [SPARK-8604] [SQL] HadoopFsRelation subclasses should set
 their output format class

`HadoopFsRelation` subclasses, especially `ParquetRelation2` should set its own output format class, so that the default output committer can be setup correctly when doing appending (where we ignore user defined output committers).

Author: Cheng Lian <lian@databricks.com>

Closes #6998 from liancheng/spark-8604 and squashes the following commits:

9be51d1 [Cheng Lian] Adds more comments
6db1368 [Cheng Lian] HadoopFsRelation subclasses should set their output format class
---
 .../apache/spark/sql/parquet/newParquet.scala |  6 ++++++
 .../spark/sql/hive/orc/OrcRelation.scala      | 12 ++++++++++-
 .../sql/sources/SimpleTextRelation.scala      |  2 ++
 .../sql/sources/hadoopFsRelationSuites.scala  | 21 +++++++++++++++++++
 4 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 1d353bd8e1..bc39fae2bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -194,6 +194,12 @@ private[sql] class ParquetRelation2(
       committerClass,
       classOf[ParquetOutputCommitter])
 
+    // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
+    // it in `ParquetOutputWriter` to support appending and dynamic partitioning.  The reason why
+    // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
+    // bundled with `ParquetOutputFormat[Row]`.
+    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
+
     // TODO There's no need to use two kinds of WriteSupport
     // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
     // complex types.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 705f48f1cd..0fd7b3a91d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.hadoop.io.{NullWritable, Writable}
-import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, RecordWriter, Reporter}
+import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
@@ -194,6 +194,16 @@ private[sql] class OrcRelation(
   }
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+    job.getConfiguration match {
+      case conf: JobConf =>
+        conf.setOutputFormat(classOf[OrcOutputFormat])
+      case conf =>
+        conf.setClass(
+          "mapred.output.format.class",
+          classOf[OrcOutputFormat],
+          classOf[MapRedOutputFormat[_, _]])
+    }
+
     new OutputWriterFactory {
       override def newInstance(
           path: String,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 5d7cd16c12..e8141923a9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -119,6 +119,8 @@ class SimpleTextRelation(
   }
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
+    job.setOutputFormatClass(classOf[TextOutputFormat[_, _]])
+
     override def newInstance(
         path: String,
         dataSchema: StructType,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index a16ab3a00d..afecf9675e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -719,4 +719,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
       }
     }
   }
+
+  test("SPARK-8604: Parquet data source should write summary file while doing appending") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext.range(0, 5)
+      df.write.mode(SaveMode.Overwrite).parquet(path)
+
+      val summaryPath = new Path(path, "_metadata")
+      val commonSummaryPath = new Path(path, "_common_metadata")
+
+      val fs = summaryPath.getFileSystem(configuration)
+      fs.delete(summaryPath, true)
+      fs.delete(commonSummaryPath, true)
+
+      df.write.mode(SaveMode.Append).parquet(path)
+      checkAnswer(sqlContext.read.parquet(path), df.unionAll(df))
+
+      assert(fs.exists(summaryPath))
+      assert(fs.exists(commonSummaryPath))
+    }
+  }
 }
-- 
GitLab