From a9c8d53cfa0bd09565799cec88344b286d7cc436 Mon Sep 17 00:00:00 2001 From: Stephen Haberman <stephen@exigencecorp.com> Date: Tue, 5 Feb 2013 22:14:18 -0600 Subject: [PATCH] Clean up RDDs, mainly to use getSplits. Also made sure clearDependencies() was calling super, to ensure the getSplits/getDependencies vars in the RDD base class get cleaned up. --- core/src/main/scala/spark/RDD.scala | 1 - core/src/main/scala/spark/rdd/BlockRDD.scala | 12 ++++------- .../main/scala/spark/rdd/CartesianRDD.scala | 3 ++- .../main/scala/spark/rdd/CheckpointRDD.scala | 4 +--- .../main/scala/spark/rdd/CoGroupedRDD.scala | 21 +++++++------------ .../main/scala/spark/rdd/CoalescedRDD.scala | 13 ++++++------ core/src/main/scala/spark/rdd/HadoopRDD.scala | 7 ++----- .../main/scala/spark/rdd/NewHadoopRDD.scala | 6 ++---- .../src/main/scala/spark/rdd/SampledRDD.scala | 8 +------ core/src/main/scala/spark/rdd/UnionRDD.scala | 8 ++----- core/src/main/scala/spark/rdd/ZippedRDD.scala | 6 +++--- 11 files changed, 30 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index f0bc85865c..5f99591fd5 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -656,7 +656,6 @@ abstract class RDD[T: ClassManifest]( */ private[spark] def markCheckpointed(checkpointRDD: RDD[_]) { clearDependencies() - dependencies_ = null splits_ = null deps = null // Forget the constructor argument for dependencies too } diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala index 2c022f88e0..4214817c65 100644 --- a/core/src/main/scala/spark/rdd/BlockRDD.scala +++ b/core/src/main/scala/spark/rdd/BlockRDD.scala @@ -11,10 +11,6 @@ private[spark] class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String]) extends RDD[T](sc, Nil) { - @transient var splits_ : Array[Split] = (0 until blockIds.size).map(i => { - new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split] - }).toArray - @transient lazy val locations_ = { val blockManager = SparkEnv.get.blockManager /*val locations = blockIds.map(id => blockManager.getLocations(id))*/ @@ -22,7 +18,10 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St HashMap(blockIds.zip(locations):_*) } - override def getSplits = splits_ + override def getSplits = (0 until blockIds.size).map(i => { + new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split] + }).toArray + override def compute(split: Split, context: TaskContext): Iterator[T] = { val blockManager = SparkEnv.get.blockManager @@ -37,8 +36,5 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St override def getPreferredLocations(split: Split) = locations_(split.asInstanceOf[BlockRDDSplit].blockId) - override def clearDependencies() { - splits_ = null - } } diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala index 0f9ca06531..2f572a1941 100644 --- a/core/src/main/scala/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala @@ -35,7 +35,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( val numSplitsInRdd2 = rdd2.splits.size - override def getSplits: Array[Split] = { + override def getSplits = { // create the cross product split val array = new Array[Split](rdd1.splits.size * rdd2.splits.size) for (s1 <- rdd1.splits; s2 <- rdd2.splits) { @@ -66,6 +66,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest]( ) override def clearDependencies() { + super.clearDependencies() rdd1 = null rdd2 = null } diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 96b593ba7c..7cde523f11 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -20,7 +20,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) - @transient val splits_ : Array[Split] = { + override def getSplits = { val dirContents = fs.listStatus(new Path(checkpointPath)) val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted val numSplits = splitFiles.size @@ -34,8 +34,6 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri checkpointData = Some(new RDDCheckpointData[T](this)) checkpointData.get.cpFile = Some(checkpointPath) - override def getSplits = splits_ - override def getPreferredLocations(split: Split): Seq[String] = { val status = fs.getFileStatus(new Path(checkpointPath)) val locations = fs.getFileBlockLocations(status, 0, status.getLen) diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index 021118c8ba..d31ce13706 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -43,26 +43,22 @@ private[spark] class CoGroupAggregator class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging { - val aggr = new CoGroupAggregator + private val aggr = new CoGroupAggregator - @transient var deps_ = { - val deps = new ArrayBuffer[Dependency[_]] - for (rdd <- rdds) { + override def getDependencies = { + rdds.map { rdd => if (rdd.partitioner == Some(part)) { logInfo("Adding one-to-one dependency with " + rdd) - deps += new OneToOneDependency(rdd) + new OneToOneDependency(rdd) } else { logInfo("Adding shuffle dependency with " + rdd) val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true) - deps += new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part) + new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part) } } - deps.toList } - override def getDependencies = deps_ - - @transient var splits_ : Array[Split] = { + override def getSplits = { val array = new Array[Split](part.numPartitions) for (i <- 0 until array.size) { // Each CoGroupSplit will have a dependency per contributing RDD @@ -79,8 +75,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) array } - override def getSplits = splits_ - override val partitioner = Some(part) override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = { @@ -117,8 +111,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner) } override def clearDependencies() { - deps_ = null - splits_ = null + super.clearDependencies() rdds = null } } diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala index 4c57434b65..a1aa7a30b0 100644 --- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala @@ -31,7 +31,7 @@ class CoalescedRDD[T: ClassManifest]( maxPartitions: Int) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies - override def getSplits: Array[Split] = { + override def getSplits = { val prevSplits = prev.splits if (prevSplits.length < maxPartitions) { prevSplits.map(_.index).map{idx => new CoalescedRDDSplit(idx, prev, Array(idx)) } @@ -50,14 +50,13 @@ class CoalescedRDD[T: ClassManifest]( } } - override def getDependencies: Seq[Dependency[_]] = List( - new NarrowDependency(prev) { - def getParents(id: Int): Seq[Int] = - splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices - } - ) + override def getDependencies = Seq(new NarrowDependency(prev) { + def getParents(id: Int): Seq[Int] = + splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices + }) override def clearDependencies() { + super.clearDependencies() prev = null } } diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index f547f53812..cd948de967 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -45,10 +45,9 @@ class HadoopRDD[K, V]( extends RDD[(K, V)](sc, Nil) { // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it - val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) - @transient - val splits_ : Array[Split] = { + override def getSplits = { val inputFormat = createInputFormat(conf) val inputSplits = inputFormat.getSplits(conf, minSplits) val array = new Array[Split](inputSplits.size) @@ -63,8 +62,6 @@ class HadoopRDD[K, V]( .asInstanceOf[InputFormat[K, V]] } - override def getSplits = splits_ - override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[HadoopSplit] var reader: RecordReader[K, V] = null diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index c3b155fcbd..2d000f5c68 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -29,7 +29,7 @@ class NewHadoopRDD[K, V]( with HadoopMapReduceUtil { // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it - val confBroadcast = sc.broadcast(new SerializableWritable(conf)) + private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) // private val serializableConf = new SerializableWritable(conf) private val jobtrackerId: String = { @@ -39,7 +39,7 @@ class NewHadoopRDD[K, V]( @transient private val jobId = new JobID(jobtrackerId, id) - @transient private val splits_ : Array[Split] = { + override def getSplits = { val inputFormat = inputFormatClass.newInstance val jobContext = newJobContext(conf, jobId) val rawSplits = inputFormat.getSplits(jobContext).toArray @@ -50,8 +50,6 @@ class NewHadoopRDD[K, V]( result } - override def getSplits = splits_ - override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] { val split = theSplit.asInstanceOf[NewHadoopSplit] val conf = confBroadcast.value.value diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala index e24ad23b21..81626d5009 100644 --- a/core/src/main/scala/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/spark/rdd/SampledRDD.scala @@ -19,13 +19,11 @@ class SampledRDD[T: ClassManifest]( seed: Int) extends RDD[T](prev) { - @transient var splits_ : Array[Split] = { + override def getSplits = { val rg = new Random(seed) firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt)) } - override def getSplits = splits_ - override def getPreferredLocations(split: Split) = firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev) @@ -48,8 +46,4 @@ class SampledRDD[T: ClassManifest]( firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac)) } } - - override def clearDependencies() { - splits_ = null - } } diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala index 26a2d511f2..5ac24d2ffc 100644 --- a/core/src/main/scala/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/spark/rdd/UnionRDD.scala @@ -28,7 +28,7 @@ class UnionRDD[T: ClassManifest]( @transient var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies - override def getSplits: Array[Split] = { + override def getSplits = { val array = new Array[Split](rdds.map(_.splits.size).sum) var pos = 0 for (rdd <- rdds; split <- rdd.splits) { @@ -38,7 +38,7 @@ class UnionRDD[T: ClassManifest]( array } - override def getDependencies: Seq[Dependency[_]] = { + override def getDependencies = { val deps = new ArrayBuffer[Dependency[_]] var pos = 0 for (rdd <- rdds) { @@ -53,8 +53,4 @@ class UnionRDD[T: ClassManifest]( override def getPreferredLocations(s: Split): Seq[String] = s.asInstanceOf[UnionSplit[T]].preferredLocations() - - override def clearDependencies() { - rdds = null - } } diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala index e5df6d8c72..a079720a93 100644 --- a/core/src/main/scala/spark/rdd/ZippedRDD.scala +++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala @@ -29,10 +29,9 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( sc: SparkContext, var rdd1: RDD[T], var rdd2: RDD[U]) - extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) - with Serializable { + extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) { - override def getSplits: Array[Split] = { + override def getSplits = { if (rdd1.splits.size != rdd2.splits.size) { throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions") } @@ -54,6 +53,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest]( } override def clearDependencies() { + super.clearDependencies() rdd1 = null rdd2 = null } -- GitLab