Skip to content
Snippets Groups Projects
Commit e4c51d21 authored by Andrew Or's avatar Andrew Or
Browse files

Address Patrick's and Reynold's comments

Aside from trivial formatting changes, use nulls instead of Options for
DiskMapIterator, and add documentation for spark.shuffle.externalSorting
and spark.shuffle.memoryFraction.

Also, set spark.shuffle.memoryFraction to 0.3, and spark.storage.memoryFraction = 0.6.
parent 372a533a
No related branches found
No related tags found
No related merge requests found
...@@ -32,7 +32,7 @@ case class Aggregator[K, V, C] ( ...@@ -32,7 +32,7 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) { mergeCombiners: (C, C) => C) {
private val sparkConf = SparkEnv.get.conf private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean private val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
if (!externalSorting) { if (!externalSorting) {
......
...@@ -106,8 +106,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: ...@@ -106,8 +106,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
override val partitioner = Some(part) override val partitioner = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
val externalSorting = sparkConf.getBoolean("spark.shuffle.externalSorting", true)
val externalSorting = sparkConf.get("spark.shuffle.externalSorting", "false").toBoolean
val split = s.asInstanceOf[CoGroupPartition] val split = s.asInstanceOf[CoGroupPartition]
val numRdds = split.deps.size val numRdds = split.deps.size
......
...@@ -864,7 +864,7 @@ private[spark] object BlockManager extends Logging { ...@@ -864,7 +864,7 @@ private[spark] object BlockManager extends Logging {
val ID_GENERATOR = new IdGenerator val ID_GENERATOR = new IdGenerator
def getMaxMemory(conf: SparkConf): Long = { def getMaxMemory(conf: SparkConf): Long = {
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66) val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
(Runtime.getRuntime.maxMemory * memoryFraction).toLong (Runtime.getRuntime.maxMemory * memoryFraction).toLong
} }
......
...@@ -71,21 +71,24 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -71,21 +71,24 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
// Collective memory threshold shared across all running tasks // Collective memory threshold shared across all running tasks
private val maxMemoryThreshold = { private val maxMemoryThreshold = {
val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.75) val memoryFraction = sparkConf.getDouble("spark.shuffle.memoryFraction", 0.3)
val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8) val safetyFraction = sparkConf.getDouble("spark.shuffle.safetyFraction", 0.8)
(Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
} }
// How many inserts into this map before tracking its shuffle memory usage // Number of pairs in the in-memory map
private val initialInsertThreshold = private var numPairsInMemory = 0
sparkConf.getLong("spark.shuffle.initialInsertThreshold", 1000)
// Number of in-memory pairs inserted before tracking the map's shuffle memory usage
private val trackMemoryThreshold = 1000
// How many times we have spilled so far
private var spillCount = 0
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val syncWrites = sparkConf.get("spark.shuffle.sync", "false").toBoolean private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
private val comparator = new KCComparator[K, C] private val comparator = new KCComparator[K, C]
private val ser = serializer.newInstance() private val ser = serializer.newInstance()
private var insertCount = 0
private var spillCount = 0
/** /**
* Insert the given key and value into the map. * Insert the given key and value into the map.
...@@ -94,14 +97,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -94,14 +97,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
* enough room for this to happen. If so, allocate the memory required to grow the map; * enough room for this to happen. If so, allocate the memory required to grow the map;
* otherwise, spill the in-memory map to disk. * otherwise, spill the in-memory map to disk.
* *
* The shuffle memory usage of the first initialInsertThreshold entries is not tracked. * The shuffle memory usage of the first trackMemoryThreshold entries is not tracked.
*/ */
def insert(key: K, value: V) { def insert(key: K, value: V) {
insertCount += 1
val update: (Boolean, C) => C = (hadVal, oldVal) => { val update: (Boolean, C) => C = (hadVal, oldVal) => {
if (hadVal) mergeValue(oldVal, value) else createCombiner(value) if (hadVal) mergeValue(oldVal, value) else createCombiner(value)
} }
if (insertCount > initialInsertThreshold && currentMap.atGrowThreshold) { if (numPairsInMemory > trackMemoryThreshold && currentMap.atGrowThreshold) {
val mapSize = currentMap.estimateSize() val mapSize = currentMap.estimateSize()
var shouldSpill = false var shouldSpill = false
val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap val shuffleMemoryMap = SparkEnv.get.shuffleMemoryMap
...@@ -114,7 +116,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -114,7 +116,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
val availableMemory = maxMemoryThreshold - val availableMemory = maxMemoryThreshold -
(shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L)) (shuffleMemoryMap.values.sum - previouslyOccupiedMemory.getOrElse(0L))
// Assume map grow factor is 2x // Assume map growth factor is 2x
shouldSpill = availableMemory < mapSize * 2 shouldSpill = availableMemory < mapSize * 2
if (!shouldSpill) { if (!shouldSpill) {
shuffleMemoryMap(threadId) = mapSize * 2 shuffleMemoryMap(threadId) = mapSize * 2
...@@ -126,6 +128,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -126,6 +128,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
} }
} }
currentMap.changeValue(key, update) currentMap.changeValue(key, update)
numPairsInMemory += 1
} }
/** /**
...@@ -133,7 +136,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -133,7 +136,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
*/ */
private def spill(mapSize: Long) { private def spill(mapSize: Long) {
spillCount += 1 spillCount += 1
logWarning("* Spilling in-memory map of %d MB to disk (%d time%s so far)" logWarning("Spilling in-memory map of %d MB to disk (%d time%s so far)"
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) .format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock() val (blockId, file) = diskBlockManager.createTempBlock()
val writer = val writer =
...@@ -157,9 +160,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -157,9 +160,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
shuffleMemoryMap.synchronized { shuffleMemoryMap.synchronized {
shuffleMemoryMap(Thread.currentThread().getId) = 0 shuffleMemoryMap(Thread.currentThread().getId) = 0
} }
insertCount = 0 numPairsInMemory = 0
} }
/**
* Return an iterator that merges the in-memory map with the spilled maps.
* If no spill has occurred, simply return the in-memory map's iterator.
*/
override def iterator: Iterator[(K, C)] = { override def iterator: Iterator[(K, C)] = {
if (spilledMaps.isEmpty) { if (spilledMaps.isEmpty) {
currentMap.iterator currentMap.iterator
...@@ -168,7 +175,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -168,7 +175,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
} }
} }
/** An iterator that sort-merges (K, C) pairs from the in-memory and on-disk maps */ /**
* An iterator that sort-merges (K, C) pairs from the in-memory map and the spilled maps
*/
private class ExternalIterator extends Iterator[(K, C)] { private class ExternalIterator extends Iterator[(K, C)] {
// A fixed-size queue that maintains a buffer for each stream we are currently merging // A fixed-size queue that maintains a buffer for each stream we are currently merging
...@@ -179,7 +188,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -179,7 +188,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
val sortedMap = currentMap.destructiveSortedIterator(comparator) val sortedMap = currentMap.destructiveSortedIterator(comparator)
val inputStreams = Seq(sortedMap) ++ spilledMaps val inputStreams = Seq(sortedMap) ++ spilledMaps
inputStreams.foreach{ it => inputStreams.foreach { it =>
val kcPairs = getMorePairs(it) val kcPairs = getMorePairs(it)
mergeHeap.enqueue(StreamBuffer(it, kcPairs)) mergeHeap.enqueue(StreamBuffer(it, kcPairs))
} }
...@@ -187,6 +196,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -187,6 +196,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
/** /**
* Fetch from the given iterator until a key of different hash is retrieved. In the * Fetch from the given iterator until a key of different hash is retrieved. In the
* event of key hash collisions, this ensures no pairs are hidden from being merged. * event of key hash collisions, this ensures no pairs are hidden from being merged.
* Assume the given iterator is in sorted order.
*/ */
def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = { def getMorePairs(it: Iterator[(K, C)]): ArrayBuffer[(K, C)] = {
val kcPairs = new ArrayBuffer[(K, C)] val kcPairs = new ArrayBuffer[(K, C)]
...@@ -219,17 +229,16 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -219,17 +229,16 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
baseCombiner baseCombiner
} }
override def hasNext: Boolean = { /**
mergeHeap.foreach{ buffer => * Return true if there exists an input stream that still has unvisited pairs
if (!buffer.pairs.isEmpty) { */
return true override def hasNext: Boolean = mergeHeap.exists(!_.pairs.isEmpty)
}
}
false
}
/**
* Select a key with the minimum hash, then combine all values with the same key from all input streams.
*/
override def next(): (K, C) = { override def next(): (K, C) = {
// Select a return key from the StreamBuffer that holds the lowest key hash // Select a key from the StreamBuffer that holds the lowest key hash
val minBuffer = mergeHeap.dequeue() val minBuffer = mergeHeap.dequeue()
val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash)
if (minPairs.length == 0) { if (minPairs.length == 0) {
...@@ -285,45 +294,43 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -285,45 +294,43 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
} }
} }
// Iterate through (K, C) pairs in sorted order from an on-disk map /**
* An iterator that returns (K, C) pairs in sorted order from an on-disk map
*/
private class DiskMapIterator(file: File) extends Iterator[(K, C)] { private class DiskMapIterator(file: File) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file) val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream) val bufferedStream = new FastBufferedInputStream(fileStream)
val deserializeStream = ser.deserializeStream(bufferedStream) val deserializeStream = ser.deserializeStream(bufferedStream)
var nextItem: Option[(K, C)] = None var nextItem: (K, C) = null
var eof = false var eof = false
def readNextItem(): Option[(K, C)] = { def readNextItem(): (K, C) = {
if (!eof) { if (!eof) {
try { try {
return Some(deserializeStream.readObject().asInstanceOf[(K, C)]) return deserializeStream.readObject().asInstanceOf[(K, C)]
} catch { } catch {
case e: EOFException => case e: EOFException =>
eof = true eof = true
cleanup() cleanup()
} }
} }
None null
} }
override def hasNext: Boolean = { override def hasNext: Boolean = {
nextItem match { if (nextItem == null) {
case Some(item) => true nextItem = readNextItem()
case None =>
nextItem = readNextItem()
nextItem.isDefined
} }
nextItem != null
} }
override def next(): (K, C) = { override def next(): (K, C) = {
nextItem match { val item = if (nextItem == null) readNextItem() else nextItem
case Some(item) => if (item == null) {
nextItem = None throw new NoSuchElementException
item
case None =>
val item = readNextItem()
item.getOrElse(throw new NoSuchElementException)
} }
nextItem = null
item
} }
// TODO: Ensure this gets called even if the iterator isn't drained. // TODO: Ensure this gets called even if the iterator isn't drained.
......
...@@ -104,13 +104,24 @@ Apart from these, the following properties are also available, and may be useful ...@@ -104,13 +104,24 @@ Apart from these, the following properties are also available, and may be useful
</tr> </tr>
<tr> <tr>
<td>spark.storage.memoryFraction</td> <td>spark.storage.memoryFraction</td>
<td>0.66</td> <td>0.6</td>
<td> <td>
Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
generation of objects in the JVM, which by default is given 2/3 of the heap, but you can increase generation of objects in the JVM, which by default is given 0.6 of the heap, but you can increase
it if you configure your own old generation size. it if you configure your own old generation size.
</td> </td>
</tr> </tr>
<tr>
<td>spark.shuffle.memoryFraction</td>
<td>0.3</td>
<td>
Fraction of Java heap to use for aggregation and cogroups during shuffles, if
<code>spark.shuffle.externalSorting</code> is enabled. At any given time, the collective size of
all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
begin to spill to disk. If spills are often, consider increasing this value at the expense of
<code>spark.storage.memoryFraction</code>.
</td>
</tr>
<tr> <tr>
<td>spark.mesos.coarse</td> <td>spark.mesos.coarse</td>
<td>false</td> <td>false</td>
...@@ -376,6 +387,15 @@ Apart from these, the following properties are also available, and may be useful ...@@ -376,6 +387,15 @@ Apart from these, the following properties are also available, and may be useful
If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations. If set to "true", consolidates intermediate files created during a shuffle. Creating fewer files can improve filesystem performance for shuffles with large numbers of reduce tasks. It is recommended to set this to "true" when using ext4 or xfs filesystems. On ext3, this option might degrade performance on machines with many (>8) cores due to filesystem limitations.
</td> </td>
</tr> </tr>
<tr>
<td>spark.shuffle.externalSorting</td>
<td>true</td>
<td>
If set to "true", spills in-memory maps used for shuffles to disk when a memory threshold is reached. This
threshold is specified by <code>spark.shuffle.memoryFraction</code>. Enable this especially for memory-intensive
applications.
</td>
</tr>
<tr> <tr>
<td>spark.speculation</td> <td>spark.speculation</td>
<td>false</td> <td>false</td>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment