diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 60ee115e393ceb8262bcbe973d8e529b66fd3a00..57f9faf5ddd1d09f9768162df047010f995ace0e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.FileSystem.Statistics import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation @@ -183,6 +184,17 @@ class SparkHadoopUtil extends Logging { Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData") statisticsDataClass.getDeclaredMethod(methodName) } + + /** + * Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly + * call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes + * for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+ + * while it's interface in Hadoop 2.+. + */ + def getConfigurationFromJobContext(context: JobContext): Configuration = { + val method = context.getClass.getMethod("getConfiguration") + method.invoke(context).asInstanceOf[Configuration] + } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala index 89b29af2000c8b91650da93c9e83aeb1da877fe2..c219d21fbefa90b19304de97f5559b2ba8c2e754 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} +import org.apache.spark.deploy.SparkHadoopUtil /** * Custom Input Format for reading and splitting flat binary files that contain records, @@ -33,7 +34,7 @@ private[spark] object FixedLengthBinaryInputFormat { /** Retrieves the record length property from a Hadoop configuration */ def getRecordLength(context: JobContext): Int = { - context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt + SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt } } diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala index 36a1e5d475f46ea4bc2ce5acf18623ca63b0c493..67a96925da0195143513c4ad09f05de5beeec215 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.io.{BytesWritable, LongWritable} import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.spark.deploy.SparkHadoopUtil /** * FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat. @@ -82,7 +83,7 @@ private[spark] class FixedLengthBinaryRecordReader // the actual file we will be reading from val file = fileSplit.getPath // job configuration - val job = context.getConfiguration + val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context) // check compression val codec = new CompressionCodecFactory(job).getCodec(file) if (codec != null) { diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 457472547fcbb6036ac22190d0bf02372cc27758..593a62b3e3b3288c0587e32b66f9e36da0f8cae6 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit} import org.apache.spark.annotation.Experimental +import org.apache.spark.deploy.SparkHadoopUtil /** * A general format for reading whole files in as streams, byte arrays, @@ -145,7 +146,8 @@ class PortableDataStream( private val confBytes = { val baos = new ByteArrayOutputStream() - context.getConfiguration.write(new DataOutputStream(baos)) + SparkHadoopUtil.get.getConfigurationFromJobContext(context). + write(new DataOutputStream(baos)) baos.toByteArray } diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 1b1131b9b8831d591d3c9ee229f597a9bd8dd568..31bde8a78f3c60c5102acfc518bb4f8e12c5aa7a 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader} import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.spark.deploy.SparkHadoopUtil /** @@ -51,7 +52,8 @@ private[spark] class WholeTextFileRecordReader( extends RecordReader[String, String] with Configurable { private[this] val path = split.getPath(index) - private[this] val fs = path.getFileSystem(context.getConfiguration) + private[this] val fs = path.getFileSystem( + SparkHadoopUtil.get.getConfigurationFromJobContext(context)) // True means the current file has been processed, then skip it. private[this] var processed = false diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 214f22bc5b603c603ef60ad85189812b096a0794..a94206963b52feff3863ca76d9ca200293dad541 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1174,7 +1174,20 @@ abstract class RDD[T: ClassTag]( * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String) { - this.map(x => (NullWritable.get(), new Text(x.toString))) + // https://issues.apache.org/jira/browse/SPARK-2075 + // + // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit + // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]` + // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an + // Ordering for `NullWritable`. That's why the compiler will generate different anonymous + // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. + // + // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate + // same bytecodes for `saveAsTextFile`. + val nullWritableClassTag = implicitly[ClassTag[NullWritable]] + val textClassTag = implicitly[ClassTag[Text]] + val r = this.map(x => (NullWritable.get(), new Text(x.toString))) + RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } @@ -1182,7 +1195,11 @@ abstract class RDD[T: ClassTag]( * Save this RDD as a compressed text file, using string representations of elements. */ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) { - this.map(x => (NullWritable.get(), new Text(x.toString))) + // https://issues.apache.org/jira/browse/SPARK-2075 + val nullWritableClassTag = implicitly[ClassTag[NullWritable]] + val textClassTag = implicitly[ClassTag[Text]] + val r = this.map(x => (NullWritable.get(), new Text(x.toString))) + RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) }