Skip to content
Snippets Groups Projects
Commit 3e29e372 authored by Andrew Or's avatar Andrew Or
Browse files

[SPARK-14468] Always enable OutputCommitCoordinator

## What changes were proposed in this pull request?

`OutputCommitCoordinator` was introduced to deal with concurrent task attempts racing to write output, leading to data loss or corruption. For more detail, read the [JIRA description](https://issues.apache.org/jira/browse/SPARK-14468).

Before: `OutputCommitCoordinator` is enabled only if speculation is enabled.
After: `OutputCommitCoordinator` is always enabled.

Users may still disable this through `spark.hadoop.outputCommitCoordination.enabled`, but they really shouldn't...

## How was this patch tested?

`OutputCommitCoordinator*Suite`

Author: Andrew Or <andrew@databricks.com>

Closes #12244 from andrewor14/always-occ.
parent 30e980ad
No related branches found
No related tags found
No related merge requests found
......@@ -33,11 +33,8 @@ object SparkHadoopMapRedUtil extends Logging {
* the driver in order to determine whether this attempt can commit (please see SPARK-4879 for
* details).
*
* Output commit coordinator is only contacted when the following two configurations are both set
* to `true`:
*
* - `spark.speculation`
* - `spark.hadoop.outputCommitCoordination.enabled`
* Output commit coordinator is only used when `spark.hadoop.outputCommitCoordination.enabled`
* is set to true (which is the default).
*/
def commitTask(
committer: MapReduceOutputCommitter,
......@@ -64,11 +61,10 @@ object SparkHadoopMapRedUtil extends Logging {
if (committer.needsTaskCommit(mrTaskContext)) {
val shouldCoordinateWithDriver: Boolean = {
val sparkConf = SparkEnv.get.conf
// We only need to coordinate with the driver if there are multiple concurrent task
// attempts, which should only occur if speculation is enabled
val speculationEnabled = sparkConf.getBoolean("spark.speculation", defaultValue = false)
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
// We only need to coordinate with the driver if there are concurrent task attempts.
// Note that this could happen even when speculation is not enabled (e.g. see SPARK-8029).
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs.
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", defaultValue = true)
}
if (shouldCoordinateWithDriver) {
......
......@@ -38,7 +38,7 @@ class OutputCommitCoordinatorIntegrationSuite
super.beforeAll()
val conf = new SparkConf()
.set("master", "local[2,4]")
.set("spark.speculation", "true")
.set("spark.hadoop.outputCommitCoordination.enabled", "true")
.set("spark.hadoop.mapred.output.committer.class",
classOf[ThrowExceptionOnFirstAttemptOutputCommitter].getCanonicalName)
sc = new SparkContext("local[2, 4]", "test", conf)
......
......@@ -77,7 +77,7 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName(classOf[OutputCommitCoordinatorSuite].getSimpleName)
.set("spark.speculation", "true")
.set("spark.hadoop.outputCommitCoordination.enabled", "true")
sc = new SparkContext(conf) {
override private[spark] def createSparkEnv(
conf: SparkConf,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment