Skip to content
Snippets Groups Projects
Commit f2bc7480 authored by Stephen Haberman's avatar Stephen Haberman
Browse files

Add RDD.coalesce.

parent 67df7f2f
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,7 @@ import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
import spark.partial.PartialResult
import spark.rdd.CoalescedRDD
import spark.rdd.CartesianRDD
import spark.rdd.FilteredRDD
import spark.rdd.FlatMappedRDD
......@@ -231,6 +232,12 @@ abstract class RDD[T: ClassManifest](
def distinct(): RDD[T] = distinct(splits.size)
/**
* Return a new RDD that is reduced into `numSplits` partitions.
*/
def coalesce(numSplits: Int = sc.defaultParallelism): RDD[T] =
new CoalescedRDD(this, numSplits)
/**
* Return a sampled subset of this RDD.
*/
......
......@@ -130,6 +130,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends PairFlatMapWorkaround
JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest,
other.classManifest)
/**
* Return a new RDD that is reduced into the default number of partitions.
*/
def coalesce(): RDD[T] = coalesce(rdd.context.defaultParallelism)
/**
* Return a new RDD that is reduced into `numSplits` partitions.
*/
def coalesce(numSplits: Int): RDD[T] = rdd.coalesce(numSplits)
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
......
......@@ -114,12 +114,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
test("CoalescedRDD") {
testCheckpointing(new CoalescedRDD(_, 2))
testCheckpointing(_.coalesce(2))
// Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
// Current implementation of CoalescedRDDSplit has transient reference to parent RDD,
// so only the RDD will reduce in serialized size, not the splits.
testParentCheckpointing(new CoalescedRDD(_, 2), true, false)
testParentCheckpointing(_.coalesce(2), true, false)
// Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after
// the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
......
......@@ -122,7 +122,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
val coalesced1 = new CoalescedRDD(data, 2)
val coalesced1 = data.coalesce(2)
assert(coalesced1.collect().toList === (1 to 10).toList)
assert(coalesced1.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
......@@ -133,19 +133,19 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList ===
List(5, 6, 7, 8, 9))
val coalesced2 = new CoalescedRDD(data, 3)
val coalesced2 = data.coalesce(3)
assert(coalesced2.collect().toList === (1 to 10).toList)
assert(coalesced2.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
val coalesced3 = new CoalescedRDD(data, 10)
val coalesced3 = data.coalesce(10)
assert(coalesced3.collect().toList === (1 to 10).toList)
assert(coalesced3.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
val coalesced4 = new CoalescedRDD(data, 20)
val coalesced4 = data.coalesce(20)
assert(coalesced4.collect().toList === (1 to 10).toList)
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment