Skip to content
Snippets Groups Projects
Commit 1b97a941 authored by Cheng Lian's avatar Cheng Lian Committed by Michael Armbrust
Browse files

[SPARK-3007][SQL] Fixes dynamic partitioning support for lower Hadoop versions

This is a follow up of #2226 and #2616 to fix Jenkins master SBT build failures for lower Hadoop versions (1.0.x and 2.0.x).

The root cause is the semantics difference of `FileSystem.globStatus()` between different versions of Hadoop, as illustrated by the following test code:

```scala
object GlobExperiments extends App {
  val conf = new Configuration()
  val fs = FileSystem.getLocal(conf)
  fs.globStatus(new Path("/tmp/wh/*/*/*")).foreach { status =>
    println(status.getPath)
  }
}
```

Target directory structure:

```
/tmp/wh
├── dir0
│   ├── dir1
│   │   └── level2
│   └── level1
└── level0
```

Hadoop 2.4.1 result:

```
file:/tmp/wh/dir0/dir1/level2
```

Hadoop 1.0.4 resuet:

```
file:/tmp/wh/dir0/dir1/level2
file:/tmp/wh/dir0/level1
file:/tmp/wh/level0
```

In #2226 and #2616, we call `FileOutputCommitter.commitJob()` at the end of the job, and the `_SUCCESS` mark file is written. When working with lower Hadoop versions, due to the `globStatus()` semantics issue, `_SUCCESS` is included as a separate partition data file by `Hive.loadDynamicPartitions()`, and fails partition spec checking.  The fix introduced in this PR is kind of a hack: when inserting data with dynamic partitioning, we intentionally avoid writing the `_SUCCESS` marker to workaround this issue.

Hive doesn't suffer this issue because `FileSinkOperator` doesn't call `FileOutputCommitter.commitJob()`, instead, it calls `Utilities.mvFileToFinalPath()` to cleanup the output directory and then loads it into Hive warehouse by with `loadDynamicPartitions()`/`loadPartition()`/`loadTable()`. This approach is better because it handles failed job and speculative tasks properly. We should add this step to `InsertIntoHiveTable` in another PR.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #2663 from liancheng/dp-hadoop-1-fix and squashes the following commits:

0177dae [Cheng Lian] Fixes dynamic partitioning support for lower Hadoop versions
parent a7c73130
No related branches found
No related tags found
No related merge requests found
...@@ -55,8 +55,8 @@ private[hive] class SparkHiveWriterContainer( ...@@ -55,8 +55,8 @@ private[hive] class SparkHiveWriterContainer(
private var taID: SerializableWritable[TaskAttemptID] = null private var taID: SerializableWritable[TaskAttemptID] = null
@transient private var writer: FileSinkOperator.RecordWriter = null @transient private var writer: FileSinkOperator.RecordWriter = null
@transient private lazy val committer = conf.value.getOutputCommitter @transient protected lazy val committer = conf.value.getOutputCommitter
@transient private lazy val jobContext = newJobContext(conf.value, jID.value) @transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
@transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
@transient private lazy val outputFormat = @transient private lazy val outputFormat =
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]] conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
...@@ -122,8 +122,6 @@ private[hive] class SparkHiveWriterContainer( ...@@ -122,8 +122,6 @@ private[hive] class SparkHiveWriterContainer(
} }
} }
// ********* Private Functions *********
private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
jobID = jobId jobID = jobId
splitID = splitId splitID = splitId
...@@ -157,12 +155,18 @@ private[hive] object SparkHiveWriterContainer { ...@@ -157,12 +155,18 @@ private[hive] object SparkHiveWriterContainer {
} }
} }
private[spark] object SparkHiveDynamicPartitionWriterContainer {
val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"
}
private[spark] class SparkHiveDynamicPartitionWriterContainer( private[spark] class SparkHiveDynamicPartitionWriterContainer(
@transient jobConf: JobConf, @transient jobConf: JobConf,
fileSinkConf: FileSinkDesc, fileSinkConf: FileSinkDesc,
dynamicPartColNames: Array[String]) dynamicPartColNames: Array[String])
extends SparkHiveWriterContainer(jobConf, fileSinkConf) { extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
import SparkHiveDynamicPartitionWriterContainer._
private val defaultPartName = jobConf.get( private val defaultPartName = jobConf.get(
ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
...@@ -179,6 +183,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( ...@@ -179,6 +183,20 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
commit() commit()
} }
override def commitJob(): Unit = {
// This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4),
// semantics of FileSystem.globStatus() is different from higher versions (e.g. 2.4.1) and will
// include _SUCCESS file when glob'ing for dynamic partition data files.
//
// Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
// calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
// load it with loadDynamicPartitions/loadPartition/loadTable.
val oldMarker = jobConf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
super.commitJob()
jobConf.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
}
override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
val dynamicPartPath = dynamicPartColNames val dynamicPartPath = dynamicPartColNames
.zip(row.takeRight(dynamicPartColNames.length)) .zip(row.takeRight(dynamicPartColNames.length))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment