Skip to content
Snippets Groups Projects
Commit df5fd352 authored by Stephen Haberman's avatar Stephen Haberman
Browse files

Add better docs for coalesce.

Include the useful tip that if shuffle=true, coalesce can actually
increase the number of partitions.

This makes coalesce more like a generic `RDD.repartition` operation.

(Ideally this `RDD.repartition` could automatically choose either a coalesce or
a shuffle if numPartitions was either less than or greater than, respectively,
the current number of partitions.)
parent 04cfb3aa
No related branches found
No related tags found
No related merge requests found
......@@ -267,6 +267,23 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*
* This results in a narrow dependency, e.g. if you go from 1000 partitions
* to 100 partitions, there will not be a shuffle, instead each of the 100
* new partitions will claim 10 of the current partitions.
*
* However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
* this may result in your computation taking place on fewer nodes than
* you like (e.g. one node in the case of numPartitions = 1). To avoid this,
* you can pass shuffle = true. This will add a shuffle step, but means the
* current upstream partitions will be executed in parallel (per whatever
* the current partitioning is).
*
* Note: With shuffle = true, you can actually coalesce to a larger number
* of partitions. This is useful if you have a small number of partitions,
* say 100, potentially with a few partitions being abnormally large. Calling
* coalecse(1000, shuffle = true) will result in 1000 partitions with the
* data evenly distributed into each partition.
*/
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
if (shuffle) {
......
......@@ -140,7 +140,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(rdd.union(emptyKv).collect().size === 2)
}
test("cogrouped RDDs") {
test("coalesced RDDs") {
val data = sc.parallelize(1 to 10, 10)
val coalesced1 = data.coalesce(2)
......@@ -175,8 +175,14 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced5 = data.coalesce(1, shuffle = true)
assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
// when shuffling, we can increase the number of partitions
val coalesced6 = data.coalesce(20, shuffle = true)
assert(coalesced6.partitions.size === 20)
assert(coalesced6.collect().toList === (1 to 10).toList)
}
test("cogrouped RDDs with locality") {
test("coalesced RDDs with locality") {
val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
val coal3 = data3.coalesce(3)
val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
......@@ -197,11 +203,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced4 = data.coalesce(20)
val listOfLists = coalesced4.glom().collect().map(_.toList).toList
val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
assert( sortedList === (1 to 9).
assert(sortedList === (1 to 9).
map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
}
test("cogrouped RDDs with locality, large scale (10K partitions)") {
test("coalesced RDDs with locality, large scale (10K partitions)") {
// large scale experiment
import collection.mutable
val rnd = scala.util.Random
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment