Skip to content
Snippets Groups Projects
Commit ee6e7f9b authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #359 from ScrapCodes/clone-writables

We clone hadoop key and values by default and reuse objects if asked to.

 We try to clone for most common types of writables and we call WritableUtils.clone otherwise intention is to optimize, for example for NullWritable there is no need and for Long, int and String creating a new object with value set would be faster than doing copy on object hopefully.

There is another way to do this PR where we ask for both key and values whether to clone them or not, but could not think of a use case for it except either of them is actually a NullWritable for which I have already worked around. So thought that would be unnecessary.
parents 4216178d 59b03e01
No related branches found
No related tags found
No related merge requests found
...@@ -349,25 +349,27 @@ class SparkContext( ...@@ -349,25 +349,27 @@ class SparkContext(
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc). * etc).
*/ */
def hadoopRDD[K, V]( def hadoopRDD[K: ClassTag, V: ClassTag](
conf: JobConf, conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]], inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
minSplits: Int = defaultMinSplits minSplits: Int = defaultMinSplits,
cloneKeyValues: Boolean = true
): RDD[(K, V)] = { ): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it. // Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf) SparkHadoopUtil.get.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
} }
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */ /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
def hadoopFile[K, V]( def hadoopFile[K: ClassTag, V: ClassTag](
path: String, path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]], inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
minSplits: Int = defaultMinSplits minSplits: Int = defaultMinSplits,
cloneKeyValues: Boolean = true
): RDD[(K, V)] = { ): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration)) val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
...@@ -379,7 +381,8 @@ class SparkContext( ...@@ -379,7 +381,8 @@ class SparkContext(
inputFormatClass, inputFormatClass,
keyClass, keyClass,
valueClass, valueClass,
minSplits) minSplits,
cloneKeyValues)
} }
/** /**
...@@ -390,14 +393,15 @@ class SparkContext( ...@@ -390,14 +393,15 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* }}} * }}}
*/ */
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) def hadoopFile[K, V, F <: InputFormat[K, V]]
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]) (path: String, minSplits: Int, cloneKeyValues: Boolean = true)
: RDD[(K, V)] = { (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path, hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]], fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]], km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]], vm.runtimeClass.asInstanceOf[Class[V]],
minSplits) minSplits,
cloneKeyValues = cloneKeyValues)
} }
/** /**
...@@ -408,61 +412,67 @@ class SparkContext( ...@@ -408,61 +412,67 @@ class SparkContext(
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
* }}} * }}}
*/ */
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneKeyValues: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits) hadoopFile[K, V, F](path, defaultMinSplits, cloneKeyValues)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String) def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String, cloneKeyValues: Boolean = true)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = { (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile( newAPIHadoopFile(
path, path,
fm.runtimeClass.asInstanceOf[Class[F]], fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]], km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]]) vm.runtimeClass.asInstanceOf[Class[V]],
cloneKeyValues = cloneKeyValues)
} }
/** /**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format. * and extra configuration options to pass to the input format.
*/ */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
path: String, path: String,
fClass: Class[F], fClass: Class[F],
kClass: Class[K], kClass: Class[K],
vClass: Class[V], vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { conf: Configuration = hadoopConfiguration,
cloneKeyValues: Boolean = true): RDD[(K, V)] = {
val job = new NewHadoopJob(conf) val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path)) NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration val updatedConf = job.getConfiguration
new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneKeyValues)
} }
/** /**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format. * and extra configuration options to pass to the input format.
*/ */
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration, conf: Configuration = hadoopConfiguration,
fClass: Class[F], fClass: Class[F],
kClass: Class[K], kClass: Class[K],
vClass: Class[V]): RDD[(K, V)] = { vClass: Class[V],
new NewHadoopRDD(this, fClass, kClass, vClass, conf) cloneKeyValues: Boolean = true): RDD[(K, V)] = {
new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneKeyValues)
} }
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */ /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K, V](path: String, def sequenceFile[K: ClassTag, V: ClassTag](path: String,
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
minSplits: Int minSplits: Int,
cloneKeyValues: Boolean = true
): RDD[(K, V)] = { ): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneKeyValues)
} }
/** Get an RDD for a Hadoop SequenceFile with given key and value types. */ /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
sequenceFile(path, keyClass, valueClass, defaultMinSplits) cloneKeyValues: Boolean = true): RDD[(K, V)] =
sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneKeyValues)
/** /**
* Version of sequenceFile() for types implicitly convertible to Writables through a * Version of sequenceFile() for types implicitly convertible to Writables through a
...@@ -480,8 +490,8 @@ class SparkContext( ...@@ -480,8 +490,8 @@ class SparkContext(
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to * for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case. * allow it to figure out the Writable class to use in the subclass case.
*/ */
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits) def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits,
(implicit km: ClassTag[K], vm: ClassTag[V], cloneKeyValues: Boolean = true) (implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = { : RDD[(K, V)] = {
val kc = kcf() val kc = kcf()
...@@ -489,7 +499,7 @@ class SparkContext( ...@@ -489,7 +499,7 @@ class SparkContext(
val format = classOf[SequenceFileInputFormat[Writable, Writable]] val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format, val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]], kc.writableClass(km).asInstanceOf[Class[Writable]],
vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits) vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneKeyValues)
writables.map{case (k,v) => (kc.convert(k), vc.convert(v))} writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
} }
......
...@@ -19,7 +19,10 @@ package org.apache.spark.rdd ...@@ -19,7 +19,10 @@ package org.apache.spark.rdd
import java.io.EOFException import java.io.EOFException
import org.apache.hadoop.mapred.FileInputFormat import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configuration, Configurable}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.JobConf
...@@ -31,7 +34,7 @@ import org.apache.spark._ ...@@ -31,7 +34,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable} import org.apache.spark.util.Utils.cloneWritables
/** /**
...@@ -62,14 +65,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp ...@@ -62,14 +65,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param valueClass Class of the value associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate. * @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
*/ */
class HadoopRDD[K, V]( class HadoopRDD[K: ClassTag, V: ClassTag](
sc: SparkContext, sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]], broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit], initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]], inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
minSplits: Int) minSplits: Int,
cloneKeyValues: Boolean)
extends RDD[(K, V)](sc, Nil) with Logging { extends RDD[(K, V)](sc, Nil) with Logging {
def this( def this(
...@@ -78,7 +82,8 @@ class HadoopRDD[K, V]( ...@@ -78,7 +82,8 @@ class HadoopRDD[K, V](
inputFormatClass: Class[_ <: InputFormat[K, V]], inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
minSplits: Int) = { minSplits: Int,
cloneKeyValues: Boolean) = {
this( this(
sc, sc,
sc.broadcast(new SerializableWritable(conf)) sc.broadcast(new SerializableWritable(conf))
...@@ -87,7 +92,8 @@ class HadoopRDD[K, V]( ...@@ -87,7 +92,8 @@ class HadoopRDD[K, V](
inputFormatClass, inputFormatClass,
keyClass, keyClass,
valueClass, valueClass,
minSplits) minSplits,
cloneKeyValues)
} }
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
...@@ -158,10 +164,10 @@ class HadoopRDD[K, V]( ...@@ -158,10 +164,10 @@ class HadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream. // Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() } context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey() val key: K = reader.createKey()
val keyCloneFunc = cloneWritables[K](getConf)
val value: V = reader.createValue() val value: V = reader.createValue()
val valueCloneFunc = cloneWritables[V](getConf)
override def getNext() = { override def getNext() = {
try { try {
finished = !reader.next(key, value) finished = !reader.next(key, value)
...@@ -169,7 +175,12 @@ class HadoopRDD[K, V]( ...@@ -169,7 +175,12 @@ class HadoopRDD[K, V](
case eof: EOFException => case eof: EOFException =>
finished = true finished = true
} }
(key, value) if (cloneKeyValues) {
(keyCloneFunc(key.asInstanceOf[Writable]),
valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
}
} }
override def close() { override def close() {
......
...@@ -20,11 +20,14 @@ package org.apache.spark.rdd ...@@ -20,11 +20,14 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.Date import java.util.Date
import scala.reflect.ClassTag
import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce._
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext} import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.util.Utils.cloneWritables
private[spark] private[spark]
...@@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS ...@@ -36,12 +39,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
override def hashCode(): Int = (41 * (41 + rddId) + index) override def hashCode(): Int = (41 * (41 + rddId) + index)
} }
class NewHadoopRDD[K, V]( class NewHadoopRDD[K: ClassTag, V: ClassTag](
sc : SparkContext, sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]], inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K], keyClass: Class[K],
valueClass: Class[V], valueClass: Class[V],
@transient conf: Configuration) @transient conf: Configuration,
cloneKeyValues: Boolean)
extends RDD[(K, V)](sc, Nil) extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil with SparkHadoopMapReduceUtil
with Logging { with Logging {
...@@ -88,7 +92,8 @@ class NewHadoopRDD[K, V]( ...@@ -88,7 +92,8 @@ class NewHadoopRDD[K, V](
// Register an on-task-completion callback to close the input stream. // Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close()) context.addOnCompleteCallback(() => close())
val keyCloneFunc = cloneWritables[K](conf)
val valueCloneFunc = cloneWritables[V](conf)
var havePair = false var havePair = false
var finished = false var finished = false
...@@ -105,7 +110,14 @@ class NewHadoopRDD[K, V]( ...@@ -105,7 +110,14 @@ class NewHadoopRDD[K, V](
throw new java.util.NoSuchElementException("End of stream") throw new java.util.NoSuchElementException("End of stream")
} }
havePair = false havePair = false
(reader.getCurrentKey, reader.getCurrentValue) val key = reader.getCurrentKey
val value = reader.getCurrentValue
if (cloneKeyValues) {
(keyCloneFunc(key.asInstanceOf[Writable]),
valueCloneFunc(value.asInstanceOf[Writable]))
} else {
(key, value)
}
} }
private def close() { private def close() {
......
...@@ -26,23 +26,47 @@ import scala.collection.JavaConversions._ ...@@ -26,23 +26,47 @@ import scala.collection.JavaConversions._
import scala.collection.Map import scala.collection.Map
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.io.Source import scala.io.Source
import scala.reflect.ClassTag import scala.reflect.{classTag, ClassTag}
import com.google.common.io.Files import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import org.apache.hadoop.io._
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.SparkHadoopUtil
import java.nio.ByteBuffer import java.nio.ByteBuffer
import org.apache.spark.{SparkConf, SparkContext, SparkException, Logging} import org.apache.spark.{SparkConf, SparkException, Logging}
/** /**
* Various utility methods used by Spark. * Various utility methods used by Spark.
*/ */
private[spark] object Utils extends Logging { private[spark] object Utils extends Logging {
/**
* We try to clone for most common types of writables and we call WritableUtils.clone otherwise
* intention is to optimize, for example for NullWritable there is no need and for Long, int and
* String creating a new object with value set would be faster.
*/
def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
val cloneFunc = classTag[T] match {
case ClassTag(_: Text) =>
(w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
case ClassTag(_: LongWritable) =>
(w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
case ClassTag(_: IntWritable) =>
(w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
case ClassTag(_: NullWritable) =>
(w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
case _ =>
(w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
}
cloneFunc
}
/** Serialize an object using Java serialization */ /** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = { def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream() val bos = new ByteArrayOutputStream()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment