diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 97fec7f737dd798aa0a01d0708606702f248feb7..bceeaa04482b745682912f1617a5b6719ad02b9f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -385,14 +385,14 @@ class SparkContext( * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) * }}} */ - def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int, - cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F] - ): RDD[(K, V)] = { + def hadoopFile[K, V, F <: InputFormat[K, V]] + (path: String, minSplits: Int, cloneKeyValues: Boolean = true) + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { hadoopFile(path, - fm.runtimeClass.asInstanceOf[Class[F]], - km.runtimeClass.asInstanceOf[Class[K]], - vm.runtimeClass.asInstanceOf[Class[V]], - minSplits, + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]], + minSplits, cloneKeyValues = cloneKeyValues) } @@ -409,15 +409,15 @@ class SparkContext( hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues) /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ - def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String, - cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F] - ): RDD[(K, V)] = { + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] + (path: String, cloneKeyValues: Boolean = true) + (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { newAPIHadoopFile( - path, - fm.runtimeClass.asInstanceOf[Class[F]], - km.runtimeClass.asInstanceOf[Class[K]], - vm.runtimeClass.asInstanceOf[Class[V]], - cloneKeyValues = cloneKeyValues) + path, + fm.runtimeClass.asInstanceOf[Class[F]], + km.runtimeClass.asInstanceOf[Class[K]], + vm.runtimeClass.asInstanceOf[Class[V]], + cloneKeyValues = cloneKeyValues) } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 13949a1bdb464bb278355dcf7ee47e2fc4e160a9..2da4611b9c0ff64689a958d4ede91c7681c61886 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -22,6 +22,7 @@ import java.io.EOFException import scala.reflect.ClassTag import org.apache.hadoop.conf.{Configuration, Configurable} +import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.JobConf @@ -91,7 +92,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( inputFormatClass, keyClass, valueClass, - minSplits, cloneKeyValues) + minSplits, + cloneKeyValues) } protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) @@ -162,10 +164,10 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback{ () => closeIfNeeded() } - val key: K = reader.createKey() + val keyCloneFunc = cloneWritables[K](getConf) val value: V = reader.createValue() - + val valueCloneFunc = cloneWritables[V](getConf) override def getNext() = { try { finished = !reader.next(key, value) @@ -174,7 +176,8 @@ class HadoopRDD[K: ClassTag, V: ClassTag]( finished = true } if (cloneKeyValues) { - (cloneWritables(key, getConf), cloneWritables(value, getConf)) + (keyCloneFunc(key.asInstanceOf[Writable]), + valueCloneFunc(value.asInstanceOf[Writable])) } else { (key, value) } diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 5428fc5691d6e444ed56077105f54ac98d633f08..e1f9995a9a12afff6489d6a2d8728492fe03681d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -92,7 +92,8 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag]( // Register an on-task-completion callback to close the input stream. context.addOnCompleteCallback(() => close()) - + val keyCloneFunc = cloneWritables[K](conf) + val valueCloneFunc = cloneWritables[V](conf) var havePair = false var finished = false @@ -112,9 +113,11 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag]( val key = reader.getCurrentKey val value = reader.getCurrentValue if (cloneKeyValues) { - (cloneWritables(key, conf), cloneWritables(value, conf)) - } else - (key, value) + (keyCloneFunc(key.asInstanceOf[Writable]), + valueCloneFunc(value.asInstanceOf[Writable])) + } else { + (key, value) + } } private def close() { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 192806e1782c1de56e4bd2f793b46023366a395f..23b72701c2c14322b467ae061cd4ecec4e08b095 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -51,15 +51,20 @@ private[spark] object Utils extends Logging { * intention is to optimize, for example for NullWritable there is no need and for Long, int and * String creating a new object with value set would be faster. */ - def cloneWritables[T: ClassTag](obj: T, conf: Configuration): T = { - val cloned = classTag[T] match { - case ClassTag(_: Text) => new Text(obj.asInstanceOf[Text].getBytes) - case ClassTag(_: LongWritable) => new LongWritable(obj.asInstanceOf[LongWritable].get) - case ClassTag(_: IntWritable) => new IntWritable(obj.asInstanceOf[IntWritable].get) - case ClassTag(_: NullWritable) => obj // TODO: should we clone this ? - case _ => WritableUtils.clone(obj.asInstanceOf[Writable], conf) // slower way of cloning. - } - cloned.asInstanceOf[T] + def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = { + val cloneFunc = classTag[T] match { + case ClassTag(_: Text) => + (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T] + case ClassTag(_: LongWritable) => + (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T] + case ClassTag(_: IntWritable) => + (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T] + case ClassTag(_: NullWritable) => + (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ? + case _ => + (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning. + } + cloneFunc } /** Serialize an object using Java serialization */