Skip to content
Snippets Groups Projects
Commit bd167f83 authored by Andrey Kouznetsov's avatar Andrey Kouznetsov
Browse files

call setConf from input format if it is Configurable

parent c1e9cdc4
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,7 @@ import org.apache.hadoop.util.ReflectionUtils
import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
import spark.util.NextIterator
import org.apache.hadoop.conf.Configurable
/**
......@@ -50,6 +51,9 @@ class HadoopRDD[K, V](
override def getPartitions: Array[Partition] = {
val inputFormat = createInputFormat(conf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
}
val inputSplits = inputFormat.getSplits(conf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
......@@ -69,6 +73,9 @@ class HadoopRDD[K, V](
val conf = confBroadcast.value.value
val fmt = createInputFormat(conf)
if (fmt.isInstanceOf[Configurable]) {
fmt.asInstanceOf[Configurable].setConf(conf)
}
reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
......
......@@ -3,7 +3,7 @@ package spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
......@@ -42,6 +42,9 @@ class NewHadoopRDD[K, V](
override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.newInstance
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(conf)
}
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
val result = new Array[Partition](rawSplits.size)
......@@ -57,6 +60,9 @@ class NewHadoopRDD[K, V](
val attemptId = new TaskAttemptID(jobtrackerId, id, true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
val format = inputFormatClass.newInstance
if (format.isInstanceOf[Configurable]) {
format.asInstanceOf[Configurable].setConf(conf)
}
val reader = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment