diff --git a/core/src/main/scala/spark/HadoopFile.scala b/core/src/main/scala/spark/HadoopFile.scala index 0a7996c7bdcc708ddc49ba41919c65ba2ba05a37..beb53ce1a54338be2cedef755eea9da9b1b522e7 100644 --- a/core/src/main/scala/spark/HadoopFile.scala +++ b/core/src/main/scala/spark/HadoopFile.scala @@ -3,6 +3,7 @@ package spark import mesos.SlaveOffer import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.FileInputFormat import org.apache.hadoop.mapred.InputFormat @@ -51,6 +52,15 @@ extends RDD[(K, V)](sc) { .asInstanceOf[InputFormat[K, V]] } + // Helper method for creating a Hadoop Writable, because the commonly used + // NullWritable class has no constructor + def createWritable[T](clazz: Class[T]): T = { + if (clazz == classOf[NullWritable]) + NullWritable.get().asInstanceOf[T] + else + clazz.newInstance() + } + override def splits = splits_ override def compute(theSplit: Split) = new Iterator[(K, V)] { @@ -63,8 +73,8 @@ extends RDD[(K, V)](sc) { val fmt = createInputFormat(conf) reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) - val key: K = keyClass.newInstance() - val value: V = valueClass.newInstance() + val key: K = createWritable(keyClass) + val value: V = createWritable(valueClass) var gotNext = false var finished = false