diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 9107c9b67681fb46fbad1c8e547c2a67d8686ea7..2786e3d2cd6bf4bcd6f907cdff18a136c1e24abf 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1348,6 +1348,34 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
     support.
   </td>
 </tr>
+<tr>
+  <td><code>spark.sql.parquet.output.committer.class</code></td>
+  <td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
+  <td>
+    <p>
+      The output committer class used by Parquet. The specified class needs to be a subclass of
+      <code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>.  Typically, it's also a
+      subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
+    </p>
+    <p>
+      <b>Note:</b>
+      <ul>
+        <li>
+          This option must be set via Hadoop <code>Configuration</code> rather than Spark
+          <code>SQLConf</code>.
+        </li>
+        <li>
+          This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
+        </li>
+      </ul>
+    </p>
+    <p>
+      Spark SQL comes with a builtin
+      <code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
+      efficient then the default Parquet output committer when writing data to S3.
+    </p>
+  </td>
+</tr>
 </table>
 
 ## JSON Datasets
@@ -1876,7 +1904,7 @@ that these options will be deprecated in future release as more optimizations ar
       Configures the number of partitions to use when shuffling data for joins or aggregations.
     </td>
   </tr>
-   <tr>
+  <tr>
     <td><code>spark.sql.planner.externalSort</code></td>
     <td>false</td>
     <td>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 16493c3d7c19ce58648a1108e914a599130197f0..265352647fa9f0b30a24ba37ad55643368bf123d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -22,6 +22,8 @@ import java.util.Properties
 import scala.collection.immutable
 import scala.collection.JavaConversions._
 
+import org.apache.parquet.hadoop.ParquetOutputCommitter
+
 import org.apache.spark.sql.catalyst.CatalystConf
 
 private[spark] object SQLConf {
@@ -252,9 +254,9 @@ private[spark] object SQLConf {
 
   val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
     defaultValue = Some(false),
-    doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default" +
-      " because of a known bug in Paruet 1.6.0rc3 " +
-      "(<a href=\"https://issues.apache.org/jira/browse/PARQUET-136\">PARQUET-136</a>). However, " +
+    doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default " +
+      "because of a known bug in Parquet 1.6.0rc3 " +
+      "(PARQUET-136, https://issues.apache.org/jira/browse/PARQUET-136). However, " +
       "if your table doesn't contain any nullable string or binary columns, it's still safe to " +
       "turn this feature on.")
 
@@ -262,11 +264,21 @@ private[spark] object SQLConf {
     defaultValue = Some(true),
     doc = "<TODO>")
 
+  val PARQUET_OUTPUT_COMMITTER_CLASS = stringConf(
+    key = "spark.sql.parquet.output.committer.class",
+    defaultValue = Some(classOf[ParquetOutputCommitter].getName),
+    doc = "The output committer class used by Parquet. The specified class needs to be a " +
+      "subclass of org.apache.hadoop.mapreduce.OutputCommitter.  Typically, it's also a subclass " +
+      "of org.apache.parquet.hadoop.ParquetOutputCommitter.  NOTE: 1. Instead of SQLConf, this " +
+      "option must be set in Hadoop Configuration.  2. This option overrides " +
+      "\"spark.sql.sources.outputCommitterClass\"."
+  )
+
   val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
     defaultValue = Some(false),
     doc = "<TODO>")
 
-  val HIVE_VERIFY_PARTITIONPATH = booleanConf("spark.sql.hive.verifyPartitionPath",
+  val HIVE_VERIFY_PARTITION_PATH = booleanConf("spark.sql.hive.verifyPartitionPath",
     defaultValue = Some(true),
     doc = "<TODO>")
 
@@ -325,9 +337,13 @@ private[spark] object SQLConf {
       defaultValue = Some(true),
       doc = "<TODO>")
 
-  // The output committer class used by FSBasedRelation. The specified class needs to be a
+  // The output committer class used by HadoopFsRelation. The specified class needs to be a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
-  // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
+  //
+  // NOTE:
+  //
+  //  1. Instead of SQLConf, this option *must be set in Hadoop Configuration*.
+  //  2. This option can be overriden by "spark.sql.parquet.output.committer.class".
   val OUTPUT_COMMITTER_CLASS =
     stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
 
@@ -415,7 +431,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
   private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
 
   /** When true uses verifyPartitionPath to prune the path which is not exists. */
-  private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITIONPATH)
+  private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH)
 
   /** When true the planner will use the external sort, which may spill to disk. */
   private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
index 62c4e92ebec681cce0d222d43ae2044b7a8e5ea1..1551afd7b7bf27837397359d0d2ffa00f1ab8074 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/DirectParquetOutputCommitter.scala
@@ -17,19 +17,35 @@
 
 package org.apache.spark.sql.parquet
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
-
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 import org.apache.parquet.Log
 import org.apache.parquet.hadoop.util.ContextUtil
 import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
 
+/**
+ * An output committer for writing Parquet files.  In stead of writing to the `_temporary` folder
+ * like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the
+ * destination folder.  This can be useful for data stored in S3, where directory operations are
+ * relatively expensive.
+ *
+ * To enable this output committer, users may set the "spark.sql.parquet.output.committer.class"
+ * property via Hadoop [[Configuration]].  Not that this property overrides
+ * "spark.sql.sources.outputCommitterClass".
+ *
+ * *NOTE*
+ *
+ *   NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
+ *   no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
+ *   left * empty).
+ */
 private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
   extends ParquetOutputCommitter(outputPath, context) {
   val LOG = Log.getLog(classOf[ParquetOutputCommitter])
 
-  override def getWorkPath(): Path = outputPath
+  override def getWorkPath: Path = outputPath
   override def abortTask(taskContext: TaskAttemptContext): Unit = {}
   override def commitTask(taskContext: TaskAttemptContext): Unit = {}
   override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
@@ -46,13 +62,11 @@ private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: T
         val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
         try {
           ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
-        } catch {
-          case e: Exception => {
-            LOG.warn("could not write summary file for " + outputPath, e)
-            val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
-            if (fileSystem.exists(metadataPath)) {
-              fileSystem.delete(metadataPath, true)
-            }
+        } catch { case e: Exception =>
+          LOG.warn("could not write summary file for " + outputPath, e)
+          val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
+          if (fileSystem.exists(metadataPath)) {
+            fileSystem.delete(metadataPath, true)
           }
         }
       } catch {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e049d54bf55dca0836e863014c1c9713e9a3936c..1d353bd8e1114464739b130ad27a55023a984e72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -178,11 +178,11 @@ private[sql] class ParquetRelation2(
 
     val committerClass =
       conf.getClass(
-        "spark.sql.parquet.output.committer.class",
+        SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
         classOf[ParquetOutputCommitter],
         classOf[ParquetOutputCommitter])
 
-    if (conf.get("spark.sql.parquet.output.committer.class") == null) {
+    if (conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key) == null) {
       logInfo("Using default output committer for Parquet: " +
         classOf[ParquetOutputCommitter].getCanonicalName)
     } else {