Skip to content
Snippets Groups Projects
Commit 64fbdf1a authored by jiangxingbo's avatar jiangxingbo Committed by Reynold Xin
Browse files

[SPARK-18191][CORE][FOLLOWUP] Call `setConf` if `OutputFormat` is `Configurable`.

## What changes were proposed in this pull request?

We should call `setConf` if `OutputFormat` is `Configurable`, this should be done before we create `OutputCommitter` and `RecordWriter`.
This is follow up of #15769, see discussion [here](https://github.com/apache/spark/pull/15769/files#r87064229)

## How was this patch tested?

Add test of this case in `PairRDDFunctionsSuite`.

Author: jiangxingbo <jiangxb1987@gmail.com>

Closes #15823 from jiangxb1987/config-format.
parent d8b81f77
No related branches found
No related tags found
No related merge requests found
......@@ -19,6 +19,7 @@ package org.apache.spark.internal.io
import java.util.Date
import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
......@@ -42,7 +43,13 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
@transient private var committer: OutputCommitter = _
protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
context.getOutputFormatClass.newInstance().getOutputCommitter(context)
val format = context.getOutputFormatClass.newInstance()
// If OutputFormat is Configurable, we should set conf to it.
format match {
case c: Configurable => c.setConf(context.getConfiguration)
case _ => ()
}
format.getOutputCommitter(context)
}
override def newTaskTempFile(
......
......@@ -23,7 +23,7 @@ import java.util.{Date, Locale}
import scala.reflect.ClassTag
import scala.util.DynamicVariable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf, JobID}
import org.apache.hadoop.mapreduce._
......@@ -140,7 +140,12 @@ object SparkHadoopMapReduceWriter extends Logging {
SparkHadoopWriterUtils.initHadoopOutputMetrics(context)
// Initiate the writer.
val taskFormat = outputFormat.newInstance
val taskFormat = outputFormat.newInstance()
// If OutputFormat is Configurable, we should set conf to it.
taskFormat match {
case c: Configurable => c.setConf(hadoopConf)
case _ => ()
}
val writer = taskFormat.getRecordWriter(taskContext)
.asInstanceOf[RecordWriter[K, V]]
require(writer != null, "Unable to obtain RecordWriter")
......
......@@ -509,6 +509,21 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
(2, ArrayBuffer(1))))
}
test("saveNewAPIHadoopFile should call setConf if format is configurable") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
// No error, non-configurable formats still work
pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored")
/*
* Check that configurable formats get configured:
* ConfigTestFormat throws an exception if we try to write
* to it when setConf hasn't been called first.
* Assertion is in ConfigTestFormat.getRecordWriter.
*/
pairs.saveAsNewAPIHadoopFile[ConfigTestFormat]("ignored")
}
test("saveAsHadoopFile should respect configured output committers") {
val pairs = sc.parallelize(Array((new Integer(1), new Integer(1))))
val conf = new JobConf()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment