diff --git a/docs/programming-guide.md b/docs/programming-guide.md index f5b775da7930a9b8a98eb57262139f207656a0cc..f4fabb0927b6623f092e0276e3d4338c43457d00 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -937,7 +937,7 @@ for details. <td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td> </tr> <tr> - <td> <b>mapPartitions</b>(<i>func</i>) </td> + <td> <b>mapPartitions</b>(<i>func</i>) <a name="MapPartLink"></a> </td> <td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. </td> </tr> @@ -964,7 +964,7 @@ for details. <td> Return a new dataset that contains the distinct elements of the source dataset.</td> </tr> <tr> - <td> <b>groupByKey</b>([<i>numTasks</i>]) </td> + <td> <b>groupByKey</b>([<i>numTasks</i>]) <a name="GroupByLink"></a> </td> <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. <br /> <b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better @@ -975,25 +975,25 @@ for details. </td> </tr> <tr> - <td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td> + <td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) <a name="ReduceByLink"></a> </td> <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td> </tr> <tr> - <td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td> + <td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) <a name="AggregateByLink"></a> </td> <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td> </tr> <tr> - <td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) </td> + <td> <b>sortByKey</b>([<i>ascending</i>], [<i>numTasks</i>]) <a name="SortByLink"></a> </td> <td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td> </tr> <tr> - <td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td> + <td> <b>join</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="JoinLink"></a> </td> <td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>. </td> </tr> <tr> - <td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td> + <td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) <a name="CogroupLink"></a> </td> <td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called <code>groupWith</code>. </td> </tr> <tr> @@ -1006,17 +1006,17 @@ for details. process's stdin and lines output to its stdout are returned as an RDD of strings. </td> </tr> <tr> - <td> <b>coalesce</b>(<i>numPartitions</i>) </td> + <td> <b>coalesce</b>(<i>numPartitions</i>) <a name="CoalesceLink"></a> </td> <td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. </td> </tr> <tr> <td> <b>repartition</b>(<i>numPartitions</i>) </td> <td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. - This always shuffles all data over the network. </td> + This always shuffles all data over the network. <a name="RepartitionLink"></a></td> </tr> <tr> - <td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) </td> + <td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td> <td> Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within each partition because it can push the sorting down into the shuffle machinery. </td> @@ -1080,7 +1080,7 @@ for details. <code>SparkContext.objectFile()</code>. </td> </tr> <tr> - <td> <b>countByKey</b>() </td> + <td> <b>countByKey</b>() <a name="CountByLink"></a> </td> <td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td> </tr> <tr> @@ -1090,6 +1090,67 @@ for details. </tr> </table> +### Shuffle operations + +Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's +mechanism for re-distributing data so that is grouped differently across partitions. This typically +involves copying data across executors and machines, making the shuffle a complex and +costly operation. + +#### Background + +To understand what happens during the shuffle we can consider the example of the +[`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all +values for a single key are combined into a tuple - the key and the result of executing a reduce +function against all values associated with that key. The challenge is that not all values for a +single key necessarily reside on the same partition, or even the same machine, but they must be +co-located to compute the result. + +In Spark, data is generally not distributed across partitions to be in the necessary place for a +specific operation. During computations, a single task will operate on a single partition - thus, to +organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an +all-to-all operation. It must read from all partitions to find all the values for all keys, +and then bring together values across partitions to compute the final result for each key - +this is called the **shuffle**. + +Although the set of elements in each partition of newly shuffled data will be deterministic, and so +is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably +ordered data following shuffle then it's possible to use: + +* `mapPartitions` to sort each partition using, for example, `.sorted` +* `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning +* `sortBy` to make a globally ordered RDD + +Operations which can cause a shuffle include **repartition** operations like +[`repartition`](#RepartitionLink), and [`coalesce`](#CoalesceLink), **'ByKey** operations +(except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and +**join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink). + +#### Performance Impact +The **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and +network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to +organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from +MapReduce and does not directly relate to Spark's `map` and `reduce` operations. + +Internally, results from individual map tasks are kept in memory until they can't fit. Then, these +are sorted based on the target partition and written to a single file. On the reduce side, tasks +read the relevant sorted blocks. + +Certain shuffle operations can consume significant amounts of heap memory since they employ +in-memory data structures to organize records before or after transferring them. Specifically, +`reduceByKey` and `aggregateByKey` create these structures on the map side and `'ByKey` operations +generate these on the reduce side. When data does not fit in memory Spark will spill these tables +to disk, incurring the additional overhead of disk I/O and increased garbage collection. + +Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files +are not cleaned up from Spark's temporary storage until Spark is stopped, which means that +long-running Spark jobs may consume available disk space. This is done so the shuffle doesn't need +to be re-computed if the lineage is re-computed. The temporary storage directory is specified by the +`spark.local.dir` configuration parameter when configuring the Spark context. + +Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the +'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html). + ## RDD Persistence One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory