From cec427e777fe2d6ef0dab285a1f4289d2ae4f89e Mon Sep 17 00:00:00 2001 From: Matei Zaharia <matei@eecs.berkeley.edu> Date: Sun, 22 May 2011 17:12:29 -0700 Subject: [PATCH] Fixed a bug with preferred locations having changed meaning in new RDDs --- core/src/main/scala/spark/RDD.scala | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 45dcad54b4..6334896cb6 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -20,11 +20,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) { // Methods that must be implemented by subclasses def splits: Array[Split] def compute(split: Split): Iterator[T] - def preferredLocations(split: Split): Seq[String] val dependencies: List[Dependency[_]] // Optionally overridden by subclasses to specify how they are partitioned val partitioner: Option[Partitioner] = None + + // Optionally overridden by subclasses to specify placement preferences + def preferredLocations(split: Split): Seq[String] = Nil def context = sc @@ -152,7 +154,6 @@ class MappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => U) extends RDD[U](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).map(f) } @@ -161,7 +162,6 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: T => Traversable[U]) extends RDD[U](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator } @@ -170,7 +170,6 @@ class FilteredRDD[T: ClassManifest]( prev: RDD[T], f: T => Boolean) extends RDD[T](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = prev.iterator(split).filter(f) } @@ -178,7 +177,6 @@ extends RDD[T](prev.context) { class SplitRDD[T: ClassManifest](prev: RDD[T]) extends RDD[Array[T]](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override def compute(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray)) } @@ -303,7 +301,6 @@ class MappedValuesRDD[K, V, U]( prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner override def compute(split: Split) = @@ -314,7 +311,6 @@ class FlatMappedValuesRDD[K, V, U]( prev: RDD[(K, V)], f: V => Traversable[U]) extends RDD[(K, U)](prev.context) { override def splits = prev.splits - override def preferredLocations(split: Split) = prev.preferredLocations(split) override val dependencies = List(new OneToOneDependency(prev)) override val partitioner = prev.partitioner override def compute(split: Split) = { -- GitLab