diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 1d353bd8e1114464739b130ad27a55023a984e72..bc39fae2bcfdeb2d0bf83adb4632b3426c3244f9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -194,6 +194,12 @@ private[sql] class ParquetRelation2(
       committerClass,
       classOf[ParquetOutputCommitter])
 
+    // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override
+    // it in `ParquetOutputWriter` to support appending and dynamic partitioning.  The reason why
+    // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is
+    // bundled with `ParquetOutputFormat[Row]`.
+    job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]])
+
     // TODO There's no need to use two kinds of WriteSupport
     // We should unify them. `SpecificMutableRow` can process both atomic (primitive) types and
     // complex types.
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 705f48f1cd9f09d7770b025dbfd9d4eac11fa64c..0fd7b3a91d6ddde5d2acdcdbb5c1184677d49cfc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.io.orc.{OrcInputFormat, OrcOutputFormat, OrcSer
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
 import org.apache.hadoop.io.{NullWritable, Writable}
-import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, RecordWriter, Reporter}
+import org.apache.hadoop.mapred.{InputFormat => MapRedInputFormat, JobConf, OutputFormat => MapRedOutputFormat, RecordWriter, Reporter}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
@@ -194,6 +194,16 @@ private[sql] class OrcRelation(
   }
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
+    job.getConfiguration match {
+      case conf: JobConf =>
+        conf.setOutputFormat(classOf[OrcOutputFormat])
+      case conf =>
+        conf.setClass(
+          "mapred.output.format.class",
+          classOf[OrcOutputFormat],
+          classOf[MapRedOutputFormat[_, _]])
+    }
+
     new OutputWriterFactory {
       override def newInstance(
           path: String,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 5d7cd16c129cd236c02f0141ab5ccce4526d143b..e8141923a9b5c6524c3ccc0b70043d6b2ef84755 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -119,6 +119,8 @@ class SimpleTextRelation(
   }
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = new OutputWriterFactory {
+    job.setOutputFormatClass(classOf[TextOutputFormat[_, _]])
+
     override def newInstance(
         path: String,
         dataSchema: StructType,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index a16ab3a00ddb87d46b55f7787ff577b0be6c8921..afecf9675e11f8ebc1ddf1e6cca31553da956a57 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -719,4 +719,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
       }
     }
   }
+
+  test("SPARK-8604: Parquet data source should write summary file while doing appending") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext.range(0, 5)
+      df.write.mode(SaveMode.Overwrite).parquet(path)
+
+      val summaryPath = new Path(path, "_metadata")
+      val commonSummaryPath = new Path(path, "_common_metadata")
+
+      val fs = summaryPath.getFileSystem(configuration)
+      fs.delete(summaryPath, true)
+      fs.delete(commonSummaryPath, true)
+
+      df.write.mode(SaveMode.Append).parquet(path)
+      checkAnswer(sqlContext.read.parquet(path), df.unionAll(df))
+
+      assert(fs.exists(summaryPath))
+      assert(fs.exists(commonSummaryPath))
+    }
+  }
 }