Skip to content
Snippets Groups Projects
Commit d7d54a44 authored by Davies Liu's avatar Davies Liu Committed by Josh Rosen
Browse files

[SPARK-2672] support compressed file in wholeTextFile

The wholeFile() can not read compressed files, it should be, just like textFile().

Author: Davies Liu <davies@databricks.com>

Closes #3005 from davies/whole and squashes the following commits:

a43fcfb [Davies Liu] remove semicolon
c83571a [Davies Liu] remove = if return type is Unit
83c844f [Davies Liu] Merge branch 'master' of github.com:apache/spark into whole
22e8b3e [Davies Liu] support compressed file in wholeTextFile
parent bd86118c
No related branches found
No related tags found
No related merge requests found
......@@ -19,14 +19,13 @@ package org.apache.spark.input
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
/**
* A [[org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat CombineFileInputFormat]] for
......@@ -34,17 +33,24 @@ import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
* the value is the entire content of file.
*/
private[spark] class WholeTextFileInputFormat extends CombineFileInputFormat[String, String] {
private[spark] class WholeTextFileInputFormat
extends CombineFileInputFormat[String, String] with Configurable {
override protected def isSplitable(context: JobContext, file: Path): Boolean = false
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[String, String] = {
new CombineFileRecordReader[String, String](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader])
val reader = new WholeCombineFileRecordReader(split, context)
reader.setConf(conf)
reader
}
/**
......
......@@ -17,11 +17,13 @@
package org.apache.spark.input
import org.apache.hadoop.conf.{Configuration, Configurable}
import com.google.common.io.{ByteStreams, Closeables}
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit
import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader}
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
......@@ -34,7 +36,13 @@ private[spark] class WholeTextFileRecordReader(
split: CombineFileSplit,
context: TaskAttemptContext,
index: Integer)
extends RecordReader[String, String] {
extends RecordReader[String, String] with Configurable {
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
private[this] val path = split.getPath(index)
private[this] val fs = path.getFileSystem(context.getConfiguration)
......@@ -57,8 +65,16 @@ private[spark] class WholeTextFileRecordReader(
override def nextKeyValue(): Boolean = {
if (!processed) {
val conf = new Configuration
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path) // infers from file ext.
val fileIn = fs.open(path)
val innerBuffer = ByteStreams.toByteArray(fileIn)
val innerBuffer = if (codec != null) {
ByteStreams.toByteArray(codec.createInputStream(fileIn))
} else {
ByteStreams.toByteArray(fileIn)
}
value = new Text(innerBuffer).toString
Closeables.close(fileIn, false)
processed = true
......@@ -68,3 +84,33 @@ private[spark] class WholeTextFileRecordReader(
}
}
}
/**
* A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single whole text file
* out in a key-value pair, where the key is the file path and the value is the entire content of
* the file.
*/
private[spark] class WholeCombineFileRecordReader(
split: InputSplit,
context: TaskAttemptContext)
extends CombineFileRecordReader[String, String](
split.asInstanceOf[CombineFileSplit],
context,
classOf[WholeTextFileRecordReader]
) with Configurable {
private var conf: Configuration = _
def setConf(c: Configuration) {
conf = c
}
def getConf: Configuration = conf
override def initNextRecordReader(): Boolean = {
val r = super.initNextRecordReader()
if (r) {
this.curReader.asInstanceOf[WholeTextFileRecordReader].setConf(conf)
}
r
}
}
......@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text
import org.apache.spark.SparkContext
import org.apache.spark.util.Utils
import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodecFactory, GzipCodec}
/**
* Tests the correctness of
......@@ -38,20 +39,32 @@ import org.apache.spark.util.Utils
*/
class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
private var sc: SparkContext = _
private var factory: CompressionCodecFactory = _
override def beforeAll() {
sc = new SparkContext("local", "test")
// Set the block size of local file system to test whether files are split right or not.
sc.hadoopConfiguration.setLong("fs.local.block.size", 32)
sc.hadoopConfiguration.set("io.compression.codecs",
"org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec")
factory = new CompressionCodecFactory(sc.hadoopConfiguration)
}
override def afterAll() {
sc.stop()
}
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte]) = {
val out = new DataOutputStream(new FileOutputStream(s"${inputDir.toString}/$fileName"))
private def createNativeFile(inputDir: File, fileName: String, contents: Array[Byte],
compress: Boolean) = {
val out = if (compress) {
val codec = new GzipCodec
val path = s"${inputDir.toString}/$fileName${codec.getDefaultExtension}"
codec.createOutputStream(new DataOutputStream(new FileOutputStream(path)))
} else {
val path = s"${inputDir.toString}/$fileName"
new DataOutputStream(new FileOutputStream(path))
}
out.write(contents, 0, contents.length)
out.close()
}
......@@ -68,7 +81,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
println(s"Local disk address is ${dir.toString}.")
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
createNativeFile(dir, filename, contents)
createNativeFile(dir, filename, contents, false)
}
val res = sc.wholeTextFiles(dir.toString, 3).collect()
......@@ -86,6 +99,31 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
Utils.deleteRecursively(dir)
}
test("Correctness of WholeTextFileRecordReader with GzipCodec.") {
val dir = Utils.createTempDir()
println(s"Local disk address is ${dir.toString}.")
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
createNativeFile(dir, filename, contents, true)
}
val res = sc.wholeTextFiles(dir.toString, 3).collect()
assert(res.size === WholeTextFileRecordReaderSuite.fileNames.size,
"Number of files read out does not fit with the actual value.")
for ((filename, contents) <- res) {
val shortName = filename.split('/').last.split('.')(0)
assert(WholeTextFileRecordReaderSuite.fileNames.contains(shortName),
s"Missing file name $filename.")
assert(contents === new Text(WholeTextFileRecordReaderSuite.files(shortName)).toString,
s"file $filename contents can not match.")
}
Utils.deleteRecursively(dir)
}
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment