Skip to content
Snippets Groups Projects
Commit f9af9cee authored by Reynold Xin's avatar Reynold Xin
Browse files

Moved PruneDependency into PartitionPruningRDD.scala.

parent 6289d965
No related branches found
No related tags found
No related merge requests found
...@@ -61,25 +61,3 @@ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) ...@@ -61,25 +61,3 @@ class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
} }
} }
} }
/**
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {
@transient
val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index))
.zipWithIndex
.map { case(split, idx) => new PruneDependency.PartitionPruningRDDSplit(idx, split) : Split }
override def getParents(partitionId: Int) = List(partitions(partitionId).index)
}
object PruneDependency {
class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split {
override val index = idx
}
}
package spark.rdd package spark.rdd
import spark.{PruneDependency, RDD, SparkEnv, Split, TaskContext} import spark.{NarrowDependency, RDD, SparkEnv, Split, TaskContext}
class PartitionPruningRDDSplit(idx: Int, val parentSplit: Split) extends Split {
override val index = idx
}
/**
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
*/
class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {
@transient
val partitions: Array[Split] = rdd.splits.filter(s => partitionFilterFunc(s.index))
.zipWithIndex.map { case(split, idx) => new PartitionPruningRDDSplit(idx, split) : Split }
override def getParents(partitionId: Int) = List(partitions(partitionId).index)
}
/** /**
...@@ -15,10 +35,8 @@ class PartitionPruningRDD[T: ClassManifest]( ...@@ -15,10 +35,8 @@ class PartitionPruningRDD[T: ClassManifest](
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) { extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
override def compute(split: Split, context: TaskContext) = firstParent[T].iterator( override def compute(split: Split, context: TaskContext) = firstParent[T].iterator(
split.asInstanceOf[PruneDependency.PartitionPruningRDDSplit].parentSplit, context) split.asInstanceOf[PartitionPruningRDDSplit].parentSplit, context)
override protected def getSplits = override protected def getSplits =
getDependencies.head.asInstanceOf[PruneDependency[T]].partitions getDependencies.head.asInstanceOf[PruneDependency[T]].partitions
override val partitioner = firstParent[T].partitioner
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment