Skip to content
Snippets Groups Projects
Commit b3107446 authored by maji2014's avatar maji2014 Committed by Andrew Or
Browse files

[SPARK-4691][shuffle] Restructure a few lines in shuffle code

In HashShuffleReader.scala and HashShuffleWriter.scala, no need to judge "dep.aggregator.isEmpty" again as this is judged by "dep.aggregator.isDefined"

In SortShuffleWriter.scala, "dep.aggregator.isEmpty"  is better than "!dep.aggregator.isDefined" ?

Author: maji2014 <maji3@asiainfo.com>

Closes #3553 from maji2014/spark-4691 and squashes the following commits:

bf7b14d [maji2014] change a elegant way for SortShuffleWriter.scala
10d0cf0 [maji2014] change a elegant way
d8f52dc [maji2014] code optimization for judgement
parent 61f1a702
No related branches found
No related tags found
No related merge requests found
......@@ -45,9 +45,9 @@ private[spark] class HashShuffleReader[K, C](
} else {
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
}
} else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}
......
......@@ -56,9 +56,8 @@ private[spark] class HashShuffleWriter[K, V](
} else {
records
}
} else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}
......
......@@ -50,9 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C](
/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment