Skip to content
Snippets Groups Projects
Commit a9c8d53c authored by Stephen Haberman's avatar Stephen Haberman
Browse files

Clean up RDDs, mainly to use getSplits.

Also made sure clearDependencies() was calling super, to ensure
the getSplits/getDependencies vars in the RDD base class get
cleaned up.
parent f4d43cb4
No related branches found
No related tags found
No related merge requests found
......@@ -656,7 +656,6 @@ abstract class RDD[T: ClassManifest](
*/
private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
clearDependencies()
dependencies_ = null
splits_ = null
deps = null // Forget the constructor argument for dependencies too
}
......
......@@ -11,10 +11,6 @@ private[spark]
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
extends RDD[T](sc, Nil) {
@transient var splits_ : Array[Split] = (0 until blockIds.size).map(i => {
new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
}).toArray
@transient lazy val locations_ = {
val blockManager = SparkEnv.get.blockManager
/*val locations = blockIds.map(id => blockManager.getLocations(id))*/
......@@ -22,7 +18,10 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
HashMap(blockIds.zip(locations):_*)
}
override def getSplits = splits_
override def getSplits = (0 until blockIds.size).map(i => {
new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
}).toArray
override def compute(split: Split, context: TaskContext): Iterator[T] = {
val blockManager = SparkEnv.get.blockManager
......@@ -37,8 +36,5 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
override def getPreferredLocations(split: Split) =
locations_(split.asInstanceOf[BlockRDDSplit].blockId)
override def clearDependencies() {
splits_ = null
}
}
......@@ -35,7 +35,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
val numSplitsInRdd2 = rdd2.splits.size
override def getSplits: Array[Split] = {
override def getSplits = {
// create the cross product split
val array = new Array[Split](rdd1.splits.size * rdd2.splits.size)
for (s1 <- rdd1.splits; s2 <- rdd2.splits) {
......@@ -66,6 +66,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
)
override def clearDependencies() {
super.clearDependencies()
rdd1 = null
rdd2 = null
}
......
......@@ -20,7 +20,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
@transient val splits_ : Array[Split] = {
override def getSplits = {
val dirContents = fs.listStatus(new Path(checkpointPath))
val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
val numSplits = splitFiles.size
......@@ -34,8 +34,6 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
checkpointData = Some(new RDDCheckpointData[T](this))
checkpointData.get.cpFile = Some(checkpointPath)
override def getSplits = splits_
override def getPreferredLocations(split: Split): Seq[String] = {
val status = fs.getFileStatus(new Path(checkpointPath))
val locations = fs.getFileBlockLocations(status, 0, status.getLen)
......
......@@ -43,26 +43,22 @@ private[spark] class CoGroupAggregator
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging {
val aggr = new CoGroupAggregator
private val aggr = new CoGroupAggregator
@transient var deps_ = {
val deps = new ArrayBuffer[Dependency[_]]
for (rdd <- rdds) {
override def getDependencies = {
rdds.map { rdd =>
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
deps += new OneToOneDependency(rdd)
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
deps += new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
}
}
deps.toList
}
override def getDependencies = deps_
@transient var splits_ : Array[Split] = {
override def getSplits = {
val array = new Array[Split](part.numPartitions)
for (i <- 0 until array.size) {
// Each CoGroupSplit will have a dependency per contributing RDD
......@@ -79,8 +75,6 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
array
}
override def getSplits = splits_
override val partitioner = Some(part)
override def compute(s: Split, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
......@@ -117,8 +111,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
override def clearDependencies() {
deps_ = null
splits_ = null
super.clearDependencies()
rdds = null
}
}
......@@ -31,7 +31,7 @@ class CoalescedRDD[T: ClassManifest](
maxPartitions: Int)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
override def getSplits: Array[Split] = {
override def getSplits = {
val prevSplits = prev.splits
if (prevSplits.length < maxPartitions) {
prevSplits.map(_.index).map{idx => new CoalescedRDDSplit(idx, prev, Array(idx)) }
......@@ -50,14 +50,13 @@ class CoalescedRDD[T: ClassManifest](
}
}
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] =
splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices
}
)
override def getDependencies = Seq(new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] =
splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices
})
override def clearDependencies() {
super.clearDependencies()
prev = null
}
}
......@@ -45,10 +45,9 @@ class HadoopRDD[K, V](
extends RDD[(K, V)](sc, Nil) {
// A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
val confBroadcast = sc.broadcast(new SerializableWritable(conf))
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
@transient
val splits_ : Array[Split] = {
override def getSplits = {
val inputFormat = createInputFormat(conf)
val inputSplits = inputFormat.getSplits(conf, minSplits)
val array = new Array[Split](inputSplits.size)
......@@ -63,8 +62,6 @@ class HadoopRDD[K, V](
.asInstanceOf[InputFormat[K, V]]
}
override def getSplits = splits_
override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopSplit]
var reader: RecordReader[K, V] = null
......
......@@ -29,7 +29,7 @@ class NewHadoopRDD[K, V](
with HadoopMapReduceUtil {
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
val confBroadcast = sc.broadcast(new SerializableWritable(conf))
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
// private val serializableConf = new SerializableWritable(conf)
private val jobtrackerId: String = {
......@@ -39,7 +39,7 @@ class NewHadoopRDD[K, V](
@transient private val jobId = new JobID(jobtrackerId, id)
@transient private val splits_ : Array[Split] = {
override def getSplits = {
val inputFormat = inputFormatClass.newInstance
val jobContext = newJobContext(conf, jobId)
val rawSplits = inputFormat.getSplits(jobContext).toArray
......@@ -50,8 +50,6 @@ class NewHadoopRDD[K, V](
result
}
override def getSplits = splits_
override def compute(theSplit: Split, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopSplit]
val conf = confBroadcast.value.value
......
......@@ -19,13 +19,11 @@ class SampledRDD[T: ClassManifest](
seed: Int)
extends RDD[T](prev) {
@transient var splits_ : Array[Split] = {
override def getSplits = {
val rg = new Random(seed)
firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt))
}
override def getSplits = splits_
override def getPreferredLocations(split: Split) =
firstParent[T].preferredLocations(split.asInstanceOf[SampledRDDSplit].prev)
......@@ -48,8 +46,4 @@ class SampledRDD[T: ClassManifest](
firstParent[T].iterator(split.prev, context).filter(x => (rand.nextDouble <= frac))
}
}
override def clearDependencies() {
splits_ = null
}
}
......@@ -28,7 +28,7 @@ class UnionRDD[T: ClassManifest](
@transient var rdds: Seq[RDD[T]])
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies
override def getSplits: Array[Split] = {
override def getSplits = {
val array = new Array[Split](rdds.map(_.splits.size).sum)
var pos = 0
for (rdd <- rdds; split <- rdd.splits) {
......@@ -38,7 +38,7 @@ class UnionRDD[T: ClassManifest](
array
}
override def getDependencies: Seq[Dependency[_]] = {
override def getDependencies = {
val deps = new ArrayBuffer[Dependency[_]]
var pos = 0
for (rdd <- rdds) {
......@@ -53,8 +53,4 @@ class UnionRDD[T: ClassManifest](
override def getPreferredLocations(s: Split): Seq[String] =
s.asInstanceOf[UnionSplit[T]].preferredLocations()
override def clearDependencies() {
rdds = null
}
}
......@@ -29,10 +29,9 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
sc: SparkContext,
var rdd1: RDD[T],
var rdd2: RDD[U])
extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2)))
with Serializable {
extends RDD[(T, U)](sc, List(new OneToOneDependency(rdd1), new OneToOneDependency(rdd2))) {
override def getSplits: Array[Split] = {
override def getSplits = {
if (rdd1.splits.size != rdd2.splits.size) {
throw new IllegalArgumentException("Can't zip RDDs with unequal numbers of partitions")
}
......@@ -54,6 +53,7 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def clearDependencies() {
super.clearDependencies()
rdd1 = null
rdd2 = null
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment