Skip to content
Snippets Groups Projects
Commit 1a175d13 authored by Stephen Haberman's avatar Stephen Haberman
Browse files

Add NextIterator.closeIfNeeded.

parent 8f00d235
No related branches found
No related tags found
No related merge requests found
......@@ -72,7 +72,7 @@ class HadoopRDD[K, V](
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => close() }
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()
......
......@@ -5,6 +5,7 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
private var gotNext = false
private var nextValue: U = _
private var closed = false
protected var finished = false
/**
......@@ -34,12 +35,25 @@ private[spark] abstract class NextIterator[U] extends Iterator[U] {
*/
protected def close()
/**
* Calls the subclass-defined close method, but only once.
*
* Usually calling `close` multiple times should be fine, but historically
* there have been issues with some InputFormats throwing exceptions.
*/
def closeIfNeeded() {
if (!closed) {
close()
closed = true
}
}
override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
nextValue = getNext()
if (finished) {
close()
closeIfNeeded()
}
gotNext = true
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment