Skip to content
Snippets Groups Projects
Commit 0e2adecd authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Simplified UnionRDD slightly and added a SparkContext.union method for...

Simplified UnionRDD slightly and added a SparkContext.union method for efficiently union-ing a large number of RDDs
parent 166d9f91
No related branches found
No related tags found
No related merge requests found
......@@ -82,11 +82,10 @@ abstract class RDD[T: ClassManifest](
try { map(x => 1L).reduce(_+_) }
catch { case e: UnsupportedOperationException => 0L }
def union(other: RDD[T]) = new UnionRDD(sc, this, other)
def union(other: RDD[T]) = new UnionRDD(sc, Array(this, other))
def cartesian[U: ClassManifest](other: RDD[U]) = new CartesianRDD(sc, this, other)
def ++(other: RDD[T]) = this.union(other)
}
@serializable
......@@ -268,36 +267,27 @@ private object CachedRDD {
}
@serializable
abstract class UnionSplit[T: ClassManifest] extends Split {
def iterator(): Iterator[T]
def preferredLocations(): Seq[String]
def getId(): String
}
@serializable
class UnionSplitImpl[T: ClassManifest](
rdd: RDD[T], split: Split)
extends UnionSplit[T] {
override def iterator() = rdd.iterator(split)
override def preferredLocations() = rdd.preferredLocations(split)
override def getId() =
"UnionSplitImpl(" + split.getId() + ")"
class UnionSplit[T: ClassManifest](rdd: RDD[T], split: Split)
extends Split {
def iterator() = rdd.iterator(split)
def preferredLocations() = rdd.preferredLocations(split)
override def getId() = "UnionSplit(" + split.getId() + ")"
}
@serializable
class UnionRDD[T: ClassManifest](
sc: SparkContext, rdd1: RDD[T], rdd2: RDD[T])
class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
extends RDD[T](sc) {
@transient val splits_ : Array[UnionSplit[T]] = {
val a1 = rdd1.splits.map(s => new UnionSplitImpl(rdd1, s))
val a2 = rdd2.splits.map(s => new UnionSplitImpl(rdd2, s))
(a1 ++ a2).toArray
@transient val splits_ : Array[Split] = {
val splits: Seq[Split] =
for (rdd <- rdds; split <- rdd.splits)
yield new UnionSplit(rdd, split)
splits.toArray
}
override def splits = splits_.asInstanceOf[Array[Split]]
override def splits = splits_
override def iterator(s: Split): Iterator[T] = s.asInstanceOf[UnionSplit[T]].iterator()
override def iterator(s: Split): Iterator[T] =
s.asInstanceOf[UnionSplit[T]].iterator()
override def preferredLocations(s: Split): Seq[String] =
s.asInstanceOf[UnionSplit[T]].preferredLocations()
......
......@@ -33,13 +33,17 @@ extends Logging {
// Methods for creating RDDs
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int) =
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int): RDD[T] =
new ParallelArray[T](this, seq, numSlices)
def parallelize[T: ClassManifest](seq: Seq[T]): ParallelArray[T] =
def parallelize[T: ClassManifest](seq: Seq[T]): RDD[T] =
parallelize(seq, scheduler.numCores)
def textFile(path: String) = new HdfsTextFile(this, path)
def textFile(path: String): RDD[String] =
new HdfsTextFile(this, path)
def union[T: ClassManifest](rdds: RDD[T]*): RDD[T] =
new UnionRDD(this, rdds)
// Methods for creating shared variables
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment