Skip to content
Snippets Groups Projects
Commit 5b08ee63 authored by Thomas Graves's avatar Thomas Graves Committed by Davies Liu
Browse files

[SPARK-15671] performance regression CoalesceRDD.pickBin with large #…

I was running a 15TB join job with 202000 partitions. It looks like the changes I made to CoalesceRDD in pickBin() are really slow with that large of partitions. The array filter with that many elements just takes to long.
It took about an hour for it to pickBins for all the partitions.
original change:
https://github.com/apache/spark/commit/83ee92f60345f016a390d61a82f1d924f64ddf90

Just reverting the pickBin code back to get currpreflocs fixes the issue

After reverting the pickBin code the coalesce takes about 10 seconds so for now it makes sense to revert those changes and we can look at further optimizations later.

Tested this via RDDSuite unit test and manually testing the very large job.

Author: Thomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com>

Closes #13443 from tgravescs/SPARK-15671.
parent 2402b914
No related branches found
No related tags found
No related merge requests found
......@@ -169,6 +169,11 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
var noLocality = true // if true if no preferredLocations exists for parent RDD
// gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
}
class PartitionLocations(prev: RDD[_]) {
// contains all the partitions from the previous RDD that don't have preferred locations
......@@ -184,7 +189,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
val tmpPartsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]()
// first get the locations for each partition, only do this once since it can be expensive
prev.partitions.foreach(p => {
val locs = prev.context.getPreferredLocs(prev, p.index).map(tl => tl.host)
val locs = currPrefLocs(p, prev)
if (locs.size > 0) {
tmpPartsWithLocs.put(p, locs)
} else {
......@@ -287,9 +292,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
balanceSlack: Double,
partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq
// least loaded pref locs
val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare)
val prefPart = if (pref == Nil) None else pref.head
val r1 = rnd.nextInt(groupArr.size)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment