Skip to content
Snippets Groups Projects
Commit 9e4b4bd0 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Revert "SPARK-2038: rename "conf" parameters in the saveAsHadoop functions"

This reverts commit 443f5e1b.

This commit unfortunately would break source compatibility if users have named
the hadoopConf parameter.
parent d2f4f30b
No related branches found
No related tags found
No related merge requests found
......@@ -719,9 +719,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
hadoopConf: Configuration = self.context.hadoopConfiguration)
conf: Configuration = self.context.hadoopConfiguration)
{
val job = new NewAPIHadoopJob(hadoopConf)
val job = new NewAPIHadoopJob(conf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
job.setOutputFormatClass(outputFormatClass)
......@@ -752,25 +752,24 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration),
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None) {
hadoopConf.setOutputKeyClass(keyClass)
hadoopConf.setOutputValueClass(valueClass)
conf.setOutputKeyClass(keyClass)
conf.setOutputValueClass(valueClass)
// Doesn't work in Scala 2.9 due to what may be a generics bug
// TODO: Should we uncomment this for Scala 2.10?
// conf.setOutputFormat(outputFormatClass)
hadoopConf.set("mapred.output.format.class", outputFormatClass.getName)
conf.set("mapred.output.format.class", outputFormatClass.getName)
for (c <- codec) {
hadoopConf.setCompressMapOutput(true)
hadoopConf.set("mapred.output.compress", "true")
hadoopConf.setMapOutputCompressorClass(c)
hadoopConf.set("mapred.output.compression.codec", c.getCanonicalName)
hadoopConf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
}
hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(hadoopConf,
SparkHadoopWriter.createPathFromString(path, hadoopConf))
saveAsHadoopDataset(hadoopConf)
conf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)
}
/**
......@@ -779,8 +778,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* output paths required (e.g. a table name to write to) in the same way as it would be
* configured for a Hadoop MapReduce job.
*/
def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) {
val job = new NewAPIHadoopJob(hadoopConf)
def saveAsNewAPIHadoopDataset(conf: Configuration) {
val job = new NewAPIHadoopJob(conf)
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
......@@ -836,10 +835,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
* MapReduce job.
*/
def saveAsHadoopDataset(hadoopConf: JobConf) {
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
def saveAsHadoopDataset(conf: JobConf) {
val outputFormatInstance = conf.getOutputFormat
val keyClass = conf.getOutputKeyClass
val valueClass = conf.getOutputValueClass
if (outputFormatInstance == null) {
throw new SparkException("Output format class not set")
}
......@@ -849,18 +848,18 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
if (valueClass == null) {
throw new SparkException("Output value class not set")
}
SparkHadoopUtil.get.addCredentials(hadoopConf)
SparkHadoopUtil.get.addCredentials(conf)
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(hadoopConf)
hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
val ignoredFs = FileSystem.get(conf)
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
}
val writer = new SparkHadoopWriter(hadoopConf)
val writer = new SparkHadoopWriter(conf)
writer.preSetup()
def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment