Skip to content
Snippets Groups Projects
Commit c3816de5 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Changing option wording per discussion with Andrew

parent 5d61e051
No related branches found
No related tags found
No related merge requests found
...@@ -32,7 +32,7 @@ case class Aggregator[K, V, C] ( ...@@ -32,7 +32,7 @@ case class Aggregator[K, V, C] (
mergeCombiners: (C, C) => C) { mergeCombiners: (C, C) => C) {
private val sparkConf = SparkEnv.get.conf private val sparkConf = SparkEnv.get.conf
private val externalSorting = sparkConf.getBoolean("spark.shuffle.external", true) private val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
if (!externalSorting) { if (!externalSorting) {
......
...@@ -81,7 +81,7 @@ private[spark] class BlockManager( ...@@ -81,7 +81,7 @@ private[spark] class BlockManager(
// Whether to compress RDD partitions that are stored serialized // Whether to compress RDD partitions that are stored serialized
val compressRdds = conf.getBoolean("spark.rdd.compress", false) val compressRdds = conf.getBoolean("spark.rdd.compress", false)
// Whether to compress shuffle output temporarily spilled to disk // Whether to compress shuffle output temporarily spilled to disk
val compressExternalShuffle = conf.getBoolean("spark.shuffle.external.compress", false) val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", false)
val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf)
...@@ -792,7 +792,7 @@ private[spark] class BlockManager( ...@@ -792,7 +792,7 @@ private[spark] class BlockManager(
case ShuffleBlockId(_, _, _) => compressShuffle case ShuffleBlockId(_, _, _) => compressShuffle
case BroadcastBlockId(_) => compressBroadcast case BroadcastBlockId(_) => compressBroadcast
case RDDBlockId(_, _) => compressRdds case RDDBlockId(_, _) => compressRdds
case TempBlockId(_) => compressExternalShuffle case TempBlockId(_) => compressShuffleSpill
case _ => false case _ => false
} }
......
...@@ -87,9 +87,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C]( ...@@ -87,9 +87,9 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
// batches, with each batch using its own serialization stream. This cuts down on the size // batches, with each batch using its own serialization stream. This cuts down on the size
// of reference-tracking maps constructed when deserializing a stream. // of reference-tracking maps constructed when deserializing a stream.
// //
// NOTE: Setting this too low can cause excess copying when serializing, since some serailizers // NOTE: Setting this too low can cause excess copying when serializing, since some serializers
// grow internal data structures by growing + copying every time the number of objects doubles. // grow internal data structures by growing + copying every time the number of objects doubles.
private val serializerBatchSize = sparkConf.getLong("spark.shuffle.external.batchSize", 10000) private val serializerBatchSize = sparkConf.getLong("spark.shuffle.spill.batchSize", 10000)
// How many times we have spilled so far // How many times we have spilled so far
private var spillCount = 0 private var spillCount = 0
......
...@@ -116,7 +116,7 @@ Apart from these, the following properties are also available, and may be useful ...@@ -116,7 +116,7 @@ Apart from these, the following properties are also available, and may be useful
<td>0.3</td> <td>0.3</td>
<td> <td>
Fraction of Java heap to use for aggregation and cogroups during shuffles, if Fraction of Java heap to use for aggregation and cogroups during shuffles, if
<code>spark.shuffle.external</code> is true. At any given time, the collective size of <code>spark.shuffle.spill</code> is true. At any given time, the collective size of
all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
begin to spill to disk. If spills are often, consider increasing this value at the expense of begin to spill to disk. If spills are often, consider increasing this value at the expense of
<code>spark.storage.memoryFraction</code>. <code>spark.storage.memoryFraction</code>.
...@@ -155,7 +155,7 @@ Apart from these, the following properties are also available, and may be useful ...@@ -155,7 +155,7 @@ Apart from these, the following properties are also available, and may be useful
</td> </td>
</tr> </tr>
<tr> <tr>
<td>spark.shuffle.external.compress</td> <td>spark.shuffle.spill.compress</td>
<td>false</td> <td>false</td>
<td> <td>
Whether to compress data spilled during shuffles. Whether to compress data spilled during shuffles.
...@@ -395,7 +395,7 @@ Apart from these, the following properties are also available, and may be useful ...@@ -395,7 +395,7 @@ Apart from these, the following properties are also available, and may be useful
</td> </td>
</tr> </tr>
<tr> <tr>
<td>spark.shuffle.external</td> <td>spark.shuffle.spill</td>
<td>true</td> <td>true</td>
<td> <td>
If set to "true", limits the amount of memory used during reduces by spilling data out to disk. This spilling If set to "true", limits the amount of memory used during reduces by spilling data out to disk. This spilling
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment