Skip to content
Snippets Groups Projects
Commit 4bdfb7ba authored by Ilya Ganelin's avatar Ilya Ganelin Committed by Sean Owen
Browse files

[SPARK-5750][SPARK-3441][SPARK-5836][CORE] Added documentation explaining shuffle

I've updated the Spark Programming Guide to add a section on the shuffle operation providing some background on what it does. I've also addressed some of its performance impacts.

I've included documentation to address the following issues:
https://issues.apache.org/jira/browse/SPARK-5836
https://issues.apache.org/jira/browse/SPARK-3441
https://issues.apache.org/jira/browse/SPARK-5750

https://issues.apache.org/jira/browse/SPARK-4227 is related but can be addressed in a separate PR since it involves updates to the Spark Configuration Guide.

Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Author: Ilya Ganelin <ilganeli@gmail.com>

Closes #5074 from ilganeli/SPARK-5750 and squashes the following commits:

6178e24 [Ilya Ganelin] Update programming-guide.md
7a0b96f [Ilya Ganelin] Update programming-guide.md
2c5df08 [Ilya Ganelin] Merge branch 'SPARK-5750' of github.com:ilganeli/spark into SPARK-5750
dffbd2d [Ilya Ganelin] [SPARK-5750] Slight wording update
1ff4eb4 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750
85f9c6e [Ilya Ganelin] Update programming-guide.md
349d1fa [Ilya Ganelin] Added cross linkf or configuration page
eeb5a7a [Ilya Ganelin] [SPARK-5750] Added some minor fixes
dd5cc9d [Ilya Ganelin] [SPARK-5750] Fixed some factual inaccuracies with regards to shuffle internals.
a8adb57 [Ilya Ganelin] [SPARK-5750] Incoporated feedback from Sean Owen
9954bbe [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-5750
159dd1c [Ilya Ganelin] [SPARK-5750] Style fixes from rxin.
75ef67b [Ilya Ganelin] [SPARK-5750][SPARK-3441][SPARK-5836] Added documentation explaining the shuffle operation and included errata from a number of other JIRAs
parent de673303
No related branches found
No related tags found
No related merge requests found
......@@ -937,7 +937,7 @@ for details.
<td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td>
</tr>
<tr>
<td> <b>mapPartitions</b>(<i>func</i>) </td>
<td> <b>mapPartitions</b>(<i>func</i>) <a name="MapPartLink"></a> </td>
<td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type
Iterator&lt;T&gt; => Iterator&lt;U&gt; when running on an RDD of type T. </td>
</tr>
......@@ -964,7 +964,7 @@ for details.
<td> Return a new dataset that contains the distinct elements of the source dataset.</td>
</tr>
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> <b>groupByKey</b>([<i>numTasks</i>]) <a name="GroupByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
......@@ -975,25 +975,25 @@ for details.
</td>
</tr>
<tr>
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) <a name="ReduceByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td>
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) <a name="AggregateByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) </td>
<td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) <a name="SortByLink"></a> </td>
<td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
</tr>
<tr>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="JoinLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
</td>
</tr>
<tr>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="CogroupLink"></a> </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable&lt;V&gt;, Iterable&lt;W&gt;)) tuples. This operation is also called <code>groupWith</code>. </td>
</tr>
<tr>
......@@ -1006,17 +1006,17 @@ for details.
process's stdin and lines output to its stdout are returned as an RDD of strings. </td>
</tr>
<tr>
<td> <b>coalesce</b>(<i>numPartitions</i>) </td>
<td> <b>coalesce</b>(<i>numPartitions</i>) <a name="CoalesceLink"></a> </td>
<td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
after filtering down a large dataset. </td>
</tr>
<tr>
<td> <b>repartition</b>(<i>numPartitions</i>) </td>
<td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
This always shuffles all data over the network. </td>
This always shuffles all data over the network. <a name="RepartitionLink"></a></td>
</tr>
<tr>
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) </td>
<td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td>
<td> Repartition the RDD according to the given partitioner and, within each resulting partition,
sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
each partition because it can push the sorting down into the shuffle machinery. </td>
......@@ -1080,7 +1080,7 @@ for details.
<code>SparkContext.objectFile()</code>. </td>
</tr>
<tr>
<td> <b>countByKey</b>() </td>
<td> <b>countByKey</b>() <a name="CountByLink"></a> </td>
<td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td>
</tr>
<tr>
......@@ -1090,6 +1090,67 @@ for details.
</tr>
</table>
### Shuffle operations
Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's
mechanism for re-distributing data so that is grouped differently across partitions. This typically
involves copying data across executors and machines, making the shuffle a complex and
costly operation.
#### Background
To understand what happens during the shuffle we can consider the example of the
[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all
values for a single key are combined into a tuple - the key and the result of executing a reduce
function against all values associated with that key. The challenge is that not all values for a
single key necessarily reside on the same partition, or even the same machine, but they must be
co-located to compute the result.
In Spark, data is generally not distributed across partitions to be in the necessary place for a
specific operation. During computations, a single task will operate on a single partition - thus, to
organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an
all-to-all operation. It must read from all partitions to find all the values for all keys,
and then bring together values across partitions to compute the final result for each key -
this is called the **shuffle**.
Although the set of elements in each partition of newly shuffled data will be deterministic, and so
is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably
ordered data following shuffle then it's possible to use:
* `mapPartitions` to sort each partition using, for example, `.sorted`
* `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning
* `sortBy` to make a globally ordered RDD
Operations which can cause a shuffle include **repartition** operations like
[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations
(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and
**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).
#### Performance Impact
The **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and
network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to
organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from
MapReduce and does not directly relate to Spark's `map` and `reduce` operations.
Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
are sorted based on the target partition and written to a single file. On the reduce side, tasks
read the relevant sorted blocks.
Certain shuffle operations can consume significant amounts of heap memory since they employ
in-memory data structures to organize records before or after transferring them. Specifically,
`reduceByKey` and `aggregateByKey` create these structures on the map side and `'ByKey` operations
generate these on the reduce side. When data does not fit in memory Spark will spill these tables
to disk, incurring the additional overhead of disk I/O and increased garbage collection.
Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
are not cleaned up from Spark's temporary storage until Spark is stopped, which means that
long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need
to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the
`spark.local.dir` configuration parameter when configuring the Spark context.
Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the
'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html).
## RDD Persistence
One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment