Skip to content
Snippets Groups Projects
Commit 7bdc9219 authored by Herman van Hovell's avatar Herman van Hovell Committed by Sean Owen
Browse files

[SPARK-11449][CORE] PortableDataStream should be a factory

```PortableDataStream``` maintains some internal state. This makes it tricky to reuse a stream (one needs to call ```close``` on both the ```PortableDataStream``` and the ```InputStream``` it produces).

This PR removes all state from ```PortableDataStream``` and effectively turns it into an ```InputStream```/```Array[Byte]``` factory. This makes the user responsible for managing the ```InputStream``` it returns.

cc srowen

Author: Herman van Hovell <hvanhovell@questtec.nl>

Closes #9417 from hvanhovell/SPARK-11449.
parent 859dff56
No related branches found
No related tags found
No related merge requests found
......@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
import scala.collection.JavaConverters._
import com.google.common.io.ByteStreams
import com.google.common.io.{Closeables, ByteStreams}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
......@@ -82,7 +82,6 @@ private[spark] abstract class StreamBasedRecordReader[T](
if (!processed) {
val fileIn = new PortableDataStream(split, context, index)
value = parseStream(fileIn)
fileIn.close() // if it has not been open yet, close does nothing
key = fileIn.getPath
processed = true
true
......@@ -134,12 +133,6 @@ class PortableDataStream(
index: Integer)
extends Serializable {
// transient forces file to be reopened after being serialization
// it is also used for non-serializable classes
@transient private var fileIn: DataInputStream = null
@transient private var isOpen = false
private val confBytes = {
val baos = new ByteArrayOutputStream()
SparkHadoopUtil.get.getConfigurationFromJobContext(context).
......@@ -175,40 +168,34 @@ class PortableDataStream(
}
/**
* Create a new DataInputStream from the split and context
* Create a new DataInputStream from the split and context. The user of this method is responsible
* for closing the stream after usage.
*/
def open(): DataInputStream = {
if (!isOpen) {
val pathp = split.getPath(index)
val fs = pathp.getFileSystem(conf)
fileIn = fs.open(pathp)
isOpen = true
}
fileIn
val pathp = split.getPath(index)
val fs = pathp.getFileSystem(conf)
fs.open(pathp)
}
/**
* Read the file as a byte array
*/
def toArray(): Array[Byte] = {
open()
val innerBuffer = ByteStreams.toByteArray(fileIn)
close()
innerBuffer
val stream = open()
try {
ByteStreams.toByteArray(stream)
} finally {
Closeables.close(stream, true)
}
}
/**
* Close the file (if it is currently open)
* Closing the PortableDataStream is not needed anymore. The user either can use the
* PortableDataStream to get a DataInputStream (which the user needs to close after usage),
* or a byte array.
*/
@deprecated("Closing the PortableDataStream is not needed anymore.", "1.6.0")
def close(): Unit = {
if (isOpen) {
try {
fileIn.close()
isOpen = false
} catch {
case ioe: java.io.IOException => // do nothing
}
}
}
def getPath(): String = path
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment