Skip to content
Snippets Groups Projects
Commit 143ef4f9 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Added a CoalescedRDD class for reducing the number of partitions in an RDD.

parent c45758dd
No related branches found
No related tags found
No related merge requests found
package spark
private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split
/**
* Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
* this RDD computes zero or more of the parent ones. Will produce exactly `maxPartitions` if the
* parent had more than this many partitions, or fewer if the parent had fewer.
*
* This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
* or to avoid having a large number of small tasks when processing a directory with many files.
*/
class CoalescedRDD[T: ClassManifest](prev: RDD[T], maxPartitions: Int)
extends RDD[T](prev.context)
with Logging {
@transient val splits_ : Array[Split] = {
val prevSplits = prev.splits
if (prevSplits.length < maxPartitions) {
prevSplits.zipWithIndex.map{ case (s, idx) => new CoalescedRDDSplit(idx, Array(s)) }
} else {
(0 until maxPartitions).map { i =>
val rangeStart = (i * prevSplits.length) / maxPartitions
val rangeEnd = ((i + 1) * prevSplits.length) / maxPartitions
new CoalescedRDDSplit(i, prevSplits.slice(rangeStart, rangeEnd))
}.toArray
}
}
override def splits = splits_
override def compute(split: Split): Iterator[T] = {
split.asInstanceOf[CoalescedRDDSplit].parents.iterator.flatMap {
parentSplit => prev.iterator(parentSplit)
}
}
val dependencies = List(
new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] =
splits(id).asInstanceOf[CoalescedRDDSplit].parents.map(_.index)
}
)
}
......@@ -71,4 +71,35 @@ class RDDSuite extends FunSuite with BeforeAndAfter {
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).flatMap(x => 1 to x).checkpoint()
assert(rdd.collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
}
test("coalesced RDDs") {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
val coalesced1 = new CoalescedRDD(data, 2)
assert(coalesced1.collect().toList === (1 to 10).toList)
assert(coalesced1.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
// Check that the narrow dependency is also specified correctly
assert(coalesced1.dependencies.head.getParents(0).toList === List(0, 1, 2, 3, 4))
assert(coalesced1.dependencies.head.getParents(1).toList === List(5, 6, 7, 8, 9))
val coalesced2 = new CoalescedRDD(data, 3)
assert(coalesced2.collect().toList === (1 to 10).toList)
assert(coalesced2.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
val coalesced3 = new CoalescedRDD(data, 10)
assert(coalesced3.collect().toList === (1 to 10).toList)
assert(coalesced3.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
val coalesced4 = new CoalescedRDD(data, 20)
assert(coalesced4.collect().toList === (1 to 10).toList)
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment