diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index aeeac65ccaaaf2634bc16b53b66da09f0ff37b6f..6701f24ff96da20481f697febc5ce12b6ae55147 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.mapred.FileOutputCommitter import org.apache.hadoop.mapred.FileOutputFormat -import org.apache.hadoop.mapred.HadoopWriter +import org.apache.hadoop.mapred.SparkHadoopWriter import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.OutputFormat @@ -653,7 +653,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf)) + FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf)) saveAsHadoopDataset(conf) } @@ -679,7 +679,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") - val writer = new HadoopWriter(conf) + val writer = new SparkHadoopWriter(conf) writer.preSetup() def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) { diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/SparkHadoopWriter.scala similarity index 96% rename from core/src/main/scala/spark/HadoopWriter.scala rename to core/src/main/scala/spark/SparkHadoopWriter.scala index 60840ce77ec8b65417b8bc74e016daba45b82682..6b330ef5727be767df763dd61c42caa88012b353 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/SparkHadoopWriter.scala @@ -36,7 +36,7 @@ import spark.SerializableWritable * Saves the RDD using a JobConf, which should contain an output key class, an output value class, * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ -class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable { +class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable { private val now = new Date() private val conf = new SerializableWritable(jobConf) @@ -165,7 +165,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoop splitID = splitid attemptID = attemptid - jID = new SerializableWritable[JobID](HadoopWriter.createJobID(now, jobid)) + jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid)) taID = new SerializableWritable[TaskAttemptID]( new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID)) } @@ -179,7 +179,7 @@ class HadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoop } } -object HadoopWriter { +object SparkHadoopWriter { def createJobID(time: Date, id: Int): JobID = { val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date())