diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index f6703986bdf11f123dfbb2b3d6d94b6917c9c986..376e69cd997d57f4a0a4c162bb32bd4d48e0bea0 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } } } else { - logWarning ("No need to commit output of task: " + taID.value) + logInfo ("No need to commit output of task: " + taID.value) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f6d9d12fe900613dcded3f96bb26b7c424fd5bfa..51ba8c2d17834431fb2d520c5a3eff23d1a434fc 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -872,7 +872,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } - hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + + // Use configured output committer if already set + if (conf.getOutputCommitter == null) { + hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) + } + FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriter.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 63d3ddb4af98a799493a4f110ee2833a8b5294d4..e84cc69592339f8f638ed5c763ba03c492383848 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -17,17 +17,21 @@ package org.apache.spark.rdd -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashSet +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.mapred._ +import org.apache.hadoop.util.Progressable + +import scala.collection.mutable.{ArrayBuffer, HashSet} import scala.util.Random -import org.scalatest.FunSuite import com.google.common.io.Files -import org.apache.hadoop.mapreduce._ -import org.apache.hadoop.conf.{Configuration, Configurable} - -import org.apache.spark.SparkContext._ +import org.apache.hadoop.conf.{Configurable, Configuration} +import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter, +OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, +TaskAttemptContext => NewTaskAttempContext} import org.apache.spark.{Partitioner, SharedSparkContext} +import org.apache.spark.SparkContext._ +import org.scalatest.FunSuite class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { test("aggregateByKey") { @@ -467,7 +471,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) // No error, non-configurable formats still work - pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored") + pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored") /* Check that configurable formats get configured: @@ -478,6 +482,17 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") } + test("saveAsHadoopFile should respect configured output committers") { + val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) + val conf = new JobConf() + conf.setOutputCommitter(classOf[FakeOutputCommitter]) + + FakeOutputCommitter.ran = false + pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf) + + assert(FakeOutputCommitter.ran, "OutputCommitter was never called") + } + test("lookup") { val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) @@ -621,40 +636,86 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile tries to instantiate them with Class.newInstance. */ + +/* + * Original Hadoop API + */ class FakeWriter extends RecordWriter[Integer, Integer] { + override def write(key: Integer, value: Integer): Unit = () - def close(p1: TaskAttemptContext) = () + override def close(reporter: Reporter): Unit = () +} + +class FakeOutputCommitter() extends OutputCommitter() { + override def setupJob(jobContext: JobContext): Unit = () + + override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true + + override def setupTask(taskContext: TaskAttemptContext): Unit = () + + override def commitTask(taskContext: TaskAttemptContext): Unit = { + FakeOutputCommitter.ran = true + () + } + + override def abortTask(taskContext: TaskAttemptContext): Unit = () +} + +/* + * Used to communicate state between the test harness and the OutputCommitter. + */ +object FakeOutputCommitter { + var ran = false +} + +class FakeOutputFormat() extends OutputFormat[Integer, Integer]() { + override def getRecordWriter( + ignored: FileSystem, + job: JobConf, name: String, + progress: Progressable): RecordWriter[Integer, Integer] = { + new FakeWriter() + } + + override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = () +} + +/* + * New-style Hadoop API + */ +class NewFakeWriter extends NewRecordWriter[Integer, Integer] { + + def close(p1: NewTaskAttempContext) = () def write(p1: Integer, p2: Integer) = () } -class FakeCommitter extends OutputCommitter { - def setupJob(p1: JobContext) = () +class NewFakeCommitter extends NewOutputCommitter { + def setupJob(p1: NewJobContext) = () - def needsTaskCommit(p1: TaskAttemptContext): Boolean = false + def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false - def setupTask(p1: TaskAttemptContext) = () + def setupTask(p1: NewTaskAttempContext) = () - def commitTask(p1: TaskAttemptContext) = () + def commitTask(p1: NewTaskAttempContext) = () - def abortTask(p1: TaskAttemptContext) = () + def abortTask(p1: NewTaskAttempContext) = () } -class FakeFormat() extends OutputFormat[Integer, Integer]() { +class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() { - def checkOutputSpecs(p1: JobContext) = () + def checkOutputSpecs(p1: NewJobContext) = () - def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = { - new FakeWriter() + def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { + new NewFakeWriter() } - def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = { - new FakeCommitter() + def getOutputCommitter(p1: NewTaskAttempContext): NewOutputCommitter = { + new NewFakeCommitter() } } -class ConfigTestFormat() extends FakeFormat() with Configurable { +class ConfigTestFormat() extends NewFakeFormat() with Configurable { var setConfCalled = false def setConf(p1: Configuration) = { @@ -664,7 +725,7 @@ class ConfigTestFormat() extends FakeFormat() with Configurable { def getConf: Configuration = null - override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = { + override def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { assert(setConfCalled, "setConf was never called") super.getRecordWriter(p1) }