From f9af9cee6fed9c6af896fb92556ad4f48c7f8e64 Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@cs.berkeley.edu> Date: Fri, 1 Feb 2013 00:02:46 -0800 Subject: [PATCH] Moved PruneDependency into PartitionPruningRDD.scala. --- core/src/main/scala/spark/Dependency.scala | 22 ---------------- .../scala/spark/rdd/PartitionPruningRDD.scala | 26 ++++++++++++++++--- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index 827eac850a..5eea907322 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -61,25 +61,3 @@ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) } } } - - -/** - * Represents a dependency between the PartitionPruningRDD and its parent. In this - * case, the child RDD contains a subset of partitions of the parents'. - */ -class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean) - extends NarrowDependency[T](rdd) { - - @transient - val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index)) - .zipWithIndex - .map { case(split, idx) => new PruneDependency.PartitionPruningRDDSplit(idx, split) : Split } - - override def getParents(partitionId: Int) = List(partitions(partitionId).index) -} - -object PruneDependency { - class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split { - override val index = idx - } -} diff --git a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala index 3756870fac..a50ce75171 100644 --- a/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala +++ b/core/src/main/scala/spark/rdd/PartitionPruningRDD.scala @@ -1,6 +1,26 @@ package spark.rdd -import spark.{PruneDependency, RDD, SparkEnv, Split, TaskContext} +import spark.{NarrowDependency, RDD, SparkEnv, Split, TaskContext} + + +class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split { + override val index = idx +} + + +/** + * Represents a dependency between the PartitionPruningRDD and its parent. In this + * case, the child RDD contains a subset of partitions of the parents'. + */ +class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean) + extends NarrowDependency[T](rdd) { + + @transient + val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index)) + .zipWithIndex.map { case(split, idx) => new PartitionPruningRDDSplit(idx, split) : Split } + + override def getParents(partitionId: Int) = List(partitions(partitionId).index) +} /** @@ -15,10 +35,8 @@ class PartitionPruningRDD[T: ClassManifest]( extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) { override def compute(split: Split, context: TaskContext) = firstParent[T].iterator( - split.asInstanceOf[PruneDependency.PartitionPruningRDDSplit].parentSplit, context) + split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context) override protected def getSplits = getDependencies.head.asInstanceOf[PruneDependency[T]].partitions - - override val partitioner = firstParent[T].partitioner } -- GitLab