diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index 183bce3d8d8d32e90ba526bb102a2853f79d00c5..d3601cca832b2a4fdf0ec26825a7ff38b3fbdb57 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -19,14 +19,13 @@ package org.apache.spark.input import scala.collection.JavaConversions._ +import org.apache.hadoop.conf.{Configuration, Configurable} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.JobContext import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext -import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit /** * A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for @@ -34,17 +33,24 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit * the value is the entire content of file. */ -private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] { +private[spark] class WholeTextFileInputFormat + extends CombineFileInputFormat[String, String] with Configurable { + override protected def isSplitable(context: JobContext, file: Path): Boolean = false + private var conf: Configuration = _ + def setConf(c: Configuration) { + conf = c + } + def getConf: Configuration = conf + override def createRecordReader( split: InputSplit, context: TaskAttemptContext): RecordReader[String, String] = { - new CombineFileRecordReader[String, String]( - split.asInstanceOf[CombineFileSplit], - context, - classOf[WholeTextFileRecordReader]) + val reader = new WholeCombineFileRecordReader(split, context) + reader.setConf(conf) + reader } /** diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 3564ab2e2a162f57bb19711f37617245b13506c4..6d59b24eb0596971baf7cf7e815e93be26a9e977 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -17,11 +17,13 @@ package org.apache.spark.input +import org.apache.hadoop.conf.{Configuration, Configurable} import com.google.common.io.{ByteStreams, Closeables} import org.apache.hadoop.io.Text +import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.mapreduce.InputSplit -import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit +import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader} import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext @@ -34,7 +36,13 @@ private[spark] class WholeTextFileRecordReader( split: CombineFileSplit, context: TaskAttemptContext, index: Integer) - extends RecordReader[String, String] { + extends RecordReader[String, String] with Configurable { + + private var conf: Configuration = _ + def setConf(c: Configuration) { + conf = c + } + def getConf: Configuration = conf private[this] val path = split.getPath(index) private[this] val fs = path.getFileSystem(context.getConfiguration) @@ -57,8 +65,16 @@ private[spark] class WholeTextFileRecordReader( override def nextKeyValue(): Boolean = { if (!processed) { + val conf = new Configuration + val factory = new CompressionCodecFactory(conf) + val codec = factory.getCodec(path) // infers from file ext. val fileIn = fs.open(path) - val innerBuffer = ByteStreams.toByteArray(fileIn) + val innerBuffer = if (codec != null) { + ByteStreams.toByteArray(codec.createInputStream(fileIn)) + } else { + ByteStreams.toByteArray(fileIn) + } + value = new Text(innerBuffer).toString Closeables.close(fileIn, false) processed = true @@ -68,3 +84,33 @@ private[spark] class WholeTextFileRecordReader( } } } + + +/** + * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file + * out in a key-value pair, where the key is the file path and the value is the entire content of + * the file. + */ +private[spark] class WholeCombineFileRecordReader( + split: InputSplit, + context: TaskAttemptContext) + extends CombineFileRecordReader[String, String]( + split.asInstanceOf[CombineFileSplit], + context, + classOf[WholeTextFileRecordReader] + ) with Configurable { + + private var conf: Configuration = _ + def setConf(c: Configuration) { + conf = c + } + def getConf: Configuration = conf + + override def initNextRecordReader(): Boolean = { + val r = super.initNextRecordReader() + if (r) { + this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf) + } + r + } +} diff --git a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala index 12d1c7b2faba628d8bb330eb6c417a7e1df3d187..98b0a16ce88ba6ea3e9e98b94a273204580a2f03 100644 --- a/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/input/WholeTextFileRecordReaderSuite.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text import org.apache.spark.SparkContext import org.apache.spark.util.Utils +import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec} /** * Tests the correctness of @@ -38,20 +39,32 @@ import org.apache.spark.util.Utils */ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { private var sc: SparkContext = _ + private var factory: CompressionCodecFactory = _ override def beforeAll() { sc = new SparkContext("local", "test") // Set the block size of local file system to test whether files are split right or not. sc.hadoopConfiguration.setLong("fs.local.block.size", 32) + sc.hadoopConfiguration.set("io.compression.codecs", + "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec") + factory = new CompressionCodecFactory(sc.hadoopConfiguration) } override def afterAll() { sc.stop() } - private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = { - val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName")) + private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte], + compress: Boolean) = { + val out = if (compress) { + val codec = new GzipCodec + val path = s"${inputDir.toString}/$fileName${codec.getDefaultExtension}" + codec.createOutputStream(new DataOutputStream(new FileOutputStream(path))) + } else { + val path = s"${inputDir.toString}/$fileName" + new DataOutputStream(new FileOutputStream(path)) + } out.write(contents, 0, contents.length) out.close() } @@ -68,7 +81,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { println(s"Local disk address is ${dir.toString}.") WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) => - createNativeFile(dir, filename, contents) + createNativeFile(dir, filename, contents, false) } val res = sc.wholeTextFiles(dir.toString, 3).collect() @@ -86,6 +99,31 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll { Utils.deleteRecursively(dir) } + + test("Correctness of WholeTextFileRecordReader with GzipCodec.") { + val dir = Utils.createTempDir() + println(s"Local disk address is ${dir.toString}.") + + WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) => + createNativeFile(dir, filename, contents, true) + } + + val res = sc.wholeTextFiles(dir.toString, 3).collect() + + assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size, + "Number of files read out does not fit with the actual value.") + + for ((filename, contents) <- res) { + val shortName = filename.split('/').last.split('.')(0) + + assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName), + s"Missing file name $filename.") + assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString, + s"file $filename contents can not match.") + } + + Utils.deleteRecursively(dir) + } } /**