Skip to content
Snippets Groups Projects
Commit cec427e7 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Fixed a bug with preferred locations having changed meaning in new RDDs

parent 4c888b29
No related branches found
No related tags found
No related merge requests found
......@@ -20,11 +20,13 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
// Methods that must be implemented by subclasses
def splits: Array[Split]
def compute(split: Split): Iterator[T]
def preferredLocations(split: Split): Seq[String]
val dependencies: List[Dependency[_]]
// Optionally overridden by subclasses to specify how they are partitioned
val partitioner: Option[Partitioner] = None
// Optionally overridden by subclasses to specify placement preferences
def preferredLocations(split: Split): Seq[String] = Nil
def context = sc
......@@ -152,7 +154,6 @@ class MappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => U)
extends RDD[U](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).map(f)
}
......@@ -161,7 +162,6 @@ class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
prev: RDD[T], f: T => Traversable[U])
extends RDD[U](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).toStream.flatMap(f).iterator
}
......@@ -170,7 +170,6 @@ class FilteredRDD[T: ClassManifest](
prev: RDD[T], f: T => Boolean)
extends RDD[T](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = prev.iterator(split).filter(f)
}
......@@ -178,7 +177,6 @@ extends RDD[T](prev.context) {
class SplitRDD[T: ClassManifest](prev: RDD[T])
extends RDD[Array[T]](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override def compute(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray))
}
......@@ -303,7 +301,6 @@ class MappedValuesRDD[K, V, U](
prev: RDD[(K, V)], f: V => U)
extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override val partitioner = prev.partitioner
override def compute(split: Split) =
......@@ -314,7 +311,6 @@ class FlatMappedValuesRDD[K, V, U](
prev: RDD[(K, V)], f: V => Traversable[U])
extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override val dependencies = List(new OneToOneDependency(prev))
override val partitioner = prev.partitioner
override def compute(split: Split) = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment