Skip to content
Snippets Groups Projects
Commit f5bf7ded authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Revert "[SPARK-3595] Respect configured OutputCommitters when calling saveAsHadoopFile"

This reverts commit 7a766577.

[NOTE: After some thought I decided not to merge this into 1.1 quite yet]
parent 7a766577
No related branches found
No related tags found
No related merge requests found
...@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) ...@@ -116,7 +116,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
} }
} }
} else { } else {
logInfo ("No need to commit output of task: " + taID.value) logWarning ("No need to commit output of task: " + taID.value)
} }
} }
......
...@@ -872,12 +872,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) ...@@ -872,12 +872,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName) hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
} }
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
// Use configured output committer if already set
if (conf.getOutputCommitter == null) {
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
}
FileOutputFormat.setOutputPath(hadoopConf, FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf)) SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf) saveAsHadoopDataset(hadoopConf)
......
...@@ -17,21 +17,17 @@ ...@@ -17,21 +17,17 @@
package org.apache.spark.rdd package org.apache.spark.rdd
import org.apache.hadoop.fs.FileSystem import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.mapred._ import scala.collection.mutable.HashSet
import org.apache.hadoop.util.Progressable
import scala.collection.mutable.{ArrayBuffer, HashSet}
import scala.util.Random import scala.util.Random
import org.scalatest.FunSuite
import com.google.common.io.Files import com.google.common.io.Files
import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter, import org.apache.hadoop.conf.{Configuration, Configurable}
OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
TaskAttemptContext => NewTaskAttempContext}
import org.apache.spark.{Partitioner, SharedSparkContext}
import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext._
import org.scalatest.FunSuite import org.apache.spark.{Partitioner, SharedSparkContext}
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("aggregateByKey") { test("aggregateByKey") {
...@@ -471,7 +467,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { ...@@ -471,7 +467,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1)))) val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
// No error, non-configurable formats still work // No error, non-configurable formats still work
pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored") pairs.saveAsNewAPIHadoopFile[FakeFormat]("ignored")
/* /*
Check that configurable formats get configured: Check that configurable formats get configured:
...@@ -482,17 +478,6 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { ...@@ -482,17 +478,6 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored") pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
} }
test("saveAsHadoopFile should respect configured output committers") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
val conf = new JobConf()
conf.setOutputCommitter(classOf[FakeOutputCommitter])
FakeOutputCommitter.ran = false
pairs.saveAsHadoopFile("ignored", pairs.keyClass, pairs.valueClass, classOf[FakeOutputFormat], conf)
assert(FakeOutputCommitter.ran, "OutputCommitter was never called")
}
test("lookup") { test("lookup") {
val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7)))
...@@ -636,86 +621,40 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { ...@@ -636,86 +621,40 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile and the test will therefore throw InstantiationException when saveAsNewAPIHadoopFile
tries to instantiate them with Class.newInstance. tries to instantiate them with Class.newInstance.
*/ */
/*
* Original Hadoop API
*/
class FakeWriter extends RecordWriter[Integer, Integer] { class FakeWriter extends RecordWriter[Integer, Integer] {
override def write(key: Integer, value: Integer): Unit = ()
override def close(reporter: Reporter): Unit = () def close(p1: TaskAttemptContext) = ()
}
class FakeOutputCommitter() extends OutputCommitter() {
override def setupJob(jobContext: JobContext): Unit = ()
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
override def setupTask(taskContext: TaskAttemptContext): Unit = ()
override def commitTask(taskContext: TaskAttemptContext): Unit = {
FakeOutputCommitter.ran = true
()
}
override def abortTask(taskContext: TaskAttemptContext): Unit = ()
}
/*
* Used to communicate state between the test harness and the OutputCommitter.
*/
object FakeOutputCommitter {
var ran = false
}
class FakeOutputFormat() extends OutputFormat[Integer, Integer]() {
override def getRecordWriter(
ignored: FileSystem,
job: JobConf, name: String,
progress: Progressable): RecordWriter[Integer, Integer] = {
new FakeWriter()
}
override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = ()
}
/*
* New-style Hadoop API
*/
class NewFakeWriter extends NewRecordWriter[Integer, Integer] {
def close(p1: NewTaskAttempContext) = ()
def write(p1: Integer, p2: Integer) = () def write(p1: Integer, p2: Integer) = ()
} }
class NewFakeCommitter extends NewOutputCommitter { class FakeCommitter extends OutputCommitter {
def setupJob(p1: NewJobContext) = () def setupJob(p1: JobContext) = ()
def needsTaskCommit(p1: NewTaskAttempContext): Boolean = false def needsTaskCommit(p1: TaskAttemptContext): Boolean = false
def setupTask(p1: NewTaskAttempContext) = () def setupTask(p1: TaskAttemptContext) = ()
def commitTask(p1: NewTaskAttempContext) = () def commitTask(p1: TaskAttemptContext) = ()
def abortTask(p1: NewTaskAttempContext) = () def abortTask(p1: TaskAttemptContext) = ()
} }
class NewFakeFormat() extends NewOutputFormat[Integer, Integer]() { class FakeFormat() extends OutputFormat[Integer, Integer]() {
def checkOutputSpecs(p1: NewJobContext) = () def checkOutputSpecs(p1: JobContext) = ()
def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
new NewFakeWriter() new FakeWriter()
} }
def getOutputCommitter(p1: NewTaskAttempContext): NewOutputCommitter = { def getOutputCommitter(p1: TaskAttemptContext): OutputCommitter = {
new NewFakeCommitter() new FakeCommitter()
} }
} }
class ConfigTestFormat() extends NewFakeFormat() with Configurable { class ConfigTestFormat() extends FakeFormat() with Configurable {
var setConfCalled = false var setConfCalled = false
def setConf(p1: Configuration) = { def setConf(p1: Configuration) = {
...@@ -725,7 +664,7 @@ class ConfigTestFormat() extends NewFakeFormat() with Configurable { ...@@ -725,7 +664,7 @@ class ConfigTestFormat() extends NewFakeFormat() with Configurable {
def getConf: Configuration = null def getConf: Configuration = null
override def getRecordWriter(p1: NewTaskAttempContext): NewRecordWriter[Integer, Integer] = { override def getRecordWriter(p1: TaskAttemptContext): RecordWriter[Integer, Integer] = {
assert(setConfCalled, "setConf was never called") assert(setConfCalled, "setConf was never called")
super.getRecordWriter(p1) super.getRecordWriter(p1)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment