diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 376e69cd997d57f4a0a4c162bb32bd4d48e0bea0..f6703986bdf11f123dfbb2b3d6d94b6917c9c986 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) } } } else { - logInfo ("No need to commit output of task: " + taID.value) + logWarning ("No need to commit output of task: " + taID.value) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 51ba8c2d17834431fb2d520c5a3eff23d1a434fc..f6d9d12fe900613dcded3f96bb26b7c424fd5bfa 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -872,12 +872,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } - - // Use configured output committer if already set - if (conf.getOutputCommitter == null) { - hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) - } - + hadoopConf.setOutputCommitter(classOf[FileOutputCommitter]) FileOutputFormat.setOutputPath(hadoopConf, SparkHadoopWriter.createPathFromString(path, hadoopConf)) saveAsHadoopDataset(hadoopConf) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index e84cc69592339f8f638ed5c763ba03c492383848..63d3ddb4af98a799493a4f110ee2833a8b5294d4 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -17,21 +17,17 @@ package org.apache.spark.rdd -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.mapred._ -import org.apache.hadoop.util.Progressable - -import scala.collection.mutable.{ArrayBuffer, HashSet} +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashSet import scala.util.Random +import org.scalatest.FunSuite import com.google.common.io.Files -import org.apache.hadoop.conf.{Configurable, Configuration} -import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter, -OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, -TaskAttemptContext => NewTaskAttempContext} -import org.apache.spark.{Partitioner, SharedSparkContext} +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.conf.{Configuration, Configurable} + import org.apache.spark.SparkContext._ -import org.scalatest.FunSuite +import org.apache.spark.{Partitioner, SharedSparkContext} class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { test("aggregateByKey") { @@ -471,7 +467,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) // No error, non-configurable formats still work - pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored") + pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored") /* Check that configurable formats get configured: @@ -482,17 +478,6 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") } - test("saveAsHadoopFile should respect configured output committers") { - val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) - val conf = new JobConf() - conf.setOutputCommitter(classOf[FakeOutputCommitter]) - - FakeOutputCommitter.ran = false - pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf) - - assert(FakeOutputCommitter.ran, "OutputCommitter was never called") - } - test("lookup") { val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) @@ -636,86 +621,40 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile tries to instantiate them with Class.newInstance. */ - -/* - * Original Hadoop API - */ class FakeWriter extends RecordWriter[Integer, Integer] { - override def write(key: Integer, value: Integer): Unit = () - override def close(reporter: Reporter): Unit = () -} - -class FakeOutputCommitter() extends OutputCommitter() { - override def setupJob(jobContext: JobContext): Unit = () - - override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true - - override def setupTask(taskContext: TaskAttemptContext): Unit = () - - override def commitTask(taskContext: TaskAttemptContext): Unit = { - FakeOutputCommitter.ran = true - () - } - - override def abortTask(taskContext: TaskAttemptContext): Unit = () -} - -/* - * Used to communicate state between the test harness and the OutputCommitter. - */ -object FakeOutputCommitter { - var ran = false -} - -class FakeOutputFormat() extends OutputFormat[Integer, Integer]() { - override def getRecordWriter( - ignored: FileSystem, - job: JobConf, name: String, - progress: Progressable): RecordWriter[Integer, Integer] = { - new FakeWriter() - } - - override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = () -} - -/* - * New-style Hadoop API - */ -class NewFakeWriter extends NewRecordWriter[Integer, Integer] { - - def close(p1: NewTaskAttempContext) = () + def close(p1: TaskAttemptContext) = () def write(p1: Integer, p2: Integer) = () } -class NewFakeCommitter extends NewOutputCommitter { - def setupJob(p1: NewJobContext) = () +class FakeCommitter extends OutputCommitter { + def setupJob(p1: JobContext) = () - def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false + def needsTaskCommit(p1: TaskAttemptContext): Boolean = false - def setupTask(p1: NewTaskAttempContext) = () + def setupTask(p1: TaskAttemptContext) = () - def commitTask(p1: NewTaskAttempContext) = () + def commitTask(p1: TaskAttemptContext) = () - def abortTask(p1: NewTaskAttempContext) = () + def abortTask(p1: TaskAttemptContext) = () } -class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() { +class FakeFormat() extends OutputFormat[Integer, Integer]() { - def checkOutputSpecs(p1: NewJobContext) = () + def checkOutputSpecs(p1: JobContext) = () - def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { - new NewFakeWriter() + def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = { + new FakeWriter() } - def getOutputCommitter(p1: NewTaskAttempContext): NewOutputCommitter = { - new NewFakeCommitter() + def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = { + new FakeCommitter() } } -class ConfigTestFormat() extends NewFakeFormat() with Configurable { +class ConfigTestFormat() extends FakeFormat() with Configurable { var setConfCalled = false def setConf(p1: Configuration) = { @@ -725,7 +664,7 @@ class ConfigTestFormat() extends NewFakeFormat() with Configurable { def getConf: Configuration = null - override def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { + override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = { assert(setConfCalled, "setConf was never called") super.getRecordWriter(p1) }