Skip to content
Snippets Groups Projects
Commit a0454efe authored by Ian Hummel's avatar Ian Hummel Committed by Patrick Wendell
Browse files

[SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile

Addresses the issue in https://issues.apache.org/jira/browse/SPARK-3595, namely saveAsHadoopFile hardcoding the OutputCommitter.  This is not ideal when running Spark jobs that write to S3, especially when running them from an EMR cluster where the default OutputCommitter is a DirectOutputCommitter.

Author: Ian Hummel <ian@themodernlife.net>

Closes #2450 from themodernlife/spark-3595 and squashes the following commits:

f37a0e5 [Ian Hummel] Update based on comments from pwendell
a11d9f3 [Ian Hummel] Fix formatting
4359664 [Ian Hummel] Add an example showing usage
8b6be94 [Ian Hummel] Add ability to specify OutputCommitter, espcially useful when writing to an S3 bucket from an EMR cluster
parent d112a6c7
No related branches found
No related tags found
No related merge requests found
......@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}
} else {
logWarning ("No need to commit output of task: " + taID.value)
logInfo ("No need to commit output of task: " + taID.value)
}
}
......
......@@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
// Use configured output committer if already set
if (conf.getOutputCommitter == null) {
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
......
......@@ -17,17 +17,21 @@
package org.apache.spark.rdd
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.mapred._
import org.apache.hadoop.util.Progressable
import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.util.Random
import org.scalatest.FunSuite
import com.google.common.io.Files
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.spark.SparkContext._
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter,
OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
TaskAttemptContext => NewTaskAttempContext}
import org.apache.spark.{Partitioner, SharedSparkContext}
import org.apache.spark.SparkContext._
import org.scalatest.FunSuite
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("aggregateByKey") {
......@@ -467,7 +471,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
// No error, non-configurable formats still work
pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored")
pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
/*
Check that configurable formats get configured:
......@@ -478,6 +482,17 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
}
test("saveAsHadoopFile should respect configured output committers") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
val conf = new JobConf()
conf.setOutputCommitter(classOf[FakeOutputCommitter])
FakeOutputCommitter.ran = false
pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
}
test("lookup") {
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
......@@ -621,40 +636,86 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile
tries to instantiate them with Class.newInstance.
*/
/*
* Original Hadoop API
*/
class FakeWriter extends RecordWriter[Integer, Integer] {
override def write(key: Integer, value: Integer): Unit = ()
def close(p1: TaskAttemptContext) = ()
override def close(reporter: Reporter): Unit = ()
}
class FakeOutputCommitter() extends OutputCommitter() {
override def setupJob(jobContext: JobContext): Unit = ()
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
override def setupTask(taskContext: TaskAttemptContext): Unit = ()
override def commitTask(taskContext: TaskAttemptContext): Unit = {
FakeOutputCommitter.ran = true
()
}
override def abortTask(taskContext: TaskAttemptContext): Unit = ()
}
/*
* Used to communicate state between the test harness and the OutputCommitter.
*/
object FakeOutputCommitter {
var ran = false
}
class FakeOutputFormat() extends OutputFormat[Integer, Integer]() {
override def getRecordWriter(
ignored: FileSystem,
job: JobConf, name: String,
progress: Progressable): RecordWriter[Integer, Integer] = {
new FakeWriter()
}
override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = ()
}
/*
* New-style Hadoop API
*/
class NewFakeWriter extends NewRecordWriter[Integer, Integer] {
def close(p1: NewTaskAttempContext) = ()
def write(p1: Integer, p2: Integer) = ()
}
class FakeCommitter extends OutputCommitter {
def setupJob(p1: JobContext) = ()
class NewFakeCommitter extends NewOutputCommitter {
def setupJob(p1: NewJobContext) = ()
def needsTaskCommit(p1: TaskAttemptContext): Boolean = false
def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false
def setupTask(p1: TaskAttemptContext) = ()
def setupTask(p1: NewTaskAttempContext) = ()
def commitTask(p1: TaskAttemptContext) = ()
def commitTask(p1: NewTaskAttempContext) = ()
def abortTask(p1: TaskAttemptContext) = ()
def abortTask(p1: NewTaskAttempContext) = ()
}
class FakeFormat() extends OutputFormat[Integer, Integer]() {
class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() {
def checkOutputSpecs(p1: JobContext) = ()
def checkOutputSpecs(p1: NewJobContext) = ()
def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
new FakeWriter()
def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
new NewFakeWriter()
}
def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = {
new FakeCommitter()
def getOutputCommitter(p1: NewTaskAttempContext): NewOutputCommitter = {
new NewFakeCommitter()
}
}
class ConfigTestFormat() extends FakeFormat() with Configurable {
class ConfigTestFormat() extends NewFakeFormat() with Configurable {
var setConfCalled = false
def setConf(p1: Configuration) = {
......@@ -664,7 +725,7 @@ class ConfigTestFormat() extends FakeFormat() with Configurable {
def getConf: Configuration = null
override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
override def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = {
assert(setConfCalled, "setConf was never called")
super.getRecordWriter(p1)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment