diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 45dcad54b452339a9c0ed7a452c3b7139552733b..6334896cb6354bf591305ca99bc5c20ff5e7c74f 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -20,11 +20,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { // Methods that must be implemented by subclasses def splits: Array[Split] def compute(split: Split): Iterator[T] - def preferredLocations(split: Split): Seq[String] val dependencies: List[Dependency[_]] // Optionally overridden by subclasses to specify how they are partitioned val partitioner: Option[Partitioner] = None + + // Optionally overridden by subclasses to specify placement preferences + def preferredLocations(split: Split): Seq[String] = Nil def context = sc @@ -152,7 +154,6 @@ class MappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => U) extends RDD[U](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).map(f) } @@ -161,7 +162,6 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => Traversable[U]) extends RDD[U](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator } @@ -170,7 +170,6 @@ class FilteredRDD[T: ClassManifest]( prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).filter(f) } @@ -178,7 +177,6 @@ extends RDD[T](prev.context) { class SplitRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray)) } @@ -303,7 +301,6 @@ class MappedValuesRDD[K, V, U]( prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner override def compute(split: Split) = @@ -314,7 +311,6 @@ class FlatMappedValuesRDD[K, V, U]( prev: RDD[(K, V)], f: V => Traversable[U]) extends RDD[(K, U)](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner override def compute(split: Split) = {