Skip to content
Snippets Groups Projects
Commit d942d390 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Handle exceptions in RecordReader.close() better (suggested by Jim

Donahue)
parent c8982404
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,7 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
/**
......@@ -42,7 +42,7 @@ class HadoopRDD[K, V](
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) {
extends RDD[(K, V)](sc, Nil) with Logging {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
......@@ -71,7 +71,7 @@ class HadoopRDD[K, V](
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => reader.close())
context.addOnCompleteCallback{ () => close() }
val key: K = reader.createKey()
val value: V = reader.createValue()
......@@ -88,9 +88,6 @@ class HadoopRDD[K, V](
}
gotNext = true
}
if (finished) {
reader.close()
}
!finished
}
......@@ -104,6 +101,14 @@ class HadoopRDD[K, V](
gotNext = false
(key, value)
}
private def close() {
try {
reader.close()
} catch {
case e: Exception => logWarning("Exception in RecordReader.close()", e)
}
}
}
override def getPreferredLocations(split: Partition): Seq[String] = {
......
......@@ -7,7 +7,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import spark.{Dependency, RDD, SerializableWritable, SparkContext, Partition, TaskContext}
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
private[spark]
......@@ -26,7 +26,8 @@ class NewHadoopRDD[K, V](
valueClass: Class[V],
@transient conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with HadoopMapReduceUtil {
with HadoopMapReduceUtil
with Logging {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
......@@ -61,7 +62,7 @@ class NewHadoopRDD[K, V](
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => reader.close())
context.addOnCompleteCallback(() => close())
var havePair = false
var finished = false
......@@ -81,6 +82,14 @@ class NewHadoopRDD[K, V](
havePair = false
return (reader.getCurrentKey, reader.getCurrentValue)
}
private def close() {
try {
reader.close()
} catch {
case e: Exception => logWarning("Exception in RecordReader.close()", e)
}
}
}
override def getPreferredLocations(split: Partition): Seq[String] = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment