diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index ac5c7a8220296efb41f5bb5a2e2e0009a91ffd48..6ccbc22a4acfb11b5a24d166659e496bf6191a51 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -55,8 +55,8 @@ private[hive] class SparkHiveWriterContainer(
   private var taID: SerializableWritable[TaskAttemptID] = null
 
   @transient private var writer: FileSinkOperator.RecordWriter = null
-  @transient private lazy val committer = conf.value.getOutputCommitter
-  @transient private lazy val jobContext = newJobContext(conf.value, jID.value)
+  @transient protected lazy val committer = conf.value.getOutputCommitter
+  @transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
   @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
   @transient private lazy val outputFormat =
     conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
@@ -122,8 +122,6 @@ private[hive] class SparkHiveWriterContainer(
     }
   }
 
-  // ********* Private Functions *********
-
   private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
     jobID = jobId
     splitID = splitId
@@ -157,12 +155,18 @@ private[hive] object SparkHiveWriterContainer {
   }
 }
 
+private[spark] object SparkHiveDynamicPartitionWriterContainer {
+  val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"
+}
+
 private[spark] class SparkHiveDynamicPartitionWriterContainer(
     @transient jobConf: JobConf,
     fileSinkConf: FileSinkDesc,
     dynamicPartColNames: Array[String])
   extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
 
+  import SparkHiveDynamicPartitionWriterContainer._
+
   private val defaultPartName = jobConf.get(
     ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
 
@@ -179,6 +183,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
     commit()
   }
 
+  override def commitJob(): Unit = {
+    // This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4),
+    // semantics of FileSystem.globStatus() is different from higher versions (e.g. 2.4.1) and will
+    // include _SUCCESS file when glob'ing for dynamic partition data files.
+    //
+    // Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
+    // calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
+    // load it with loadDynamicPartitions/loadPartition/loadTable.
+    val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
+    jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
+    super.commitJob()
+    jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
+  }
+
   override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
     val dynamicPartPath = dynamicPartColNames
       .zip(row.takeRight(dynamicPartColNames.length))