diff --git a/docs/sparkr.md b/docs/sparkr.md index 9fda0ec0e654233da3c0f72c3a1c1a3be9a72c4b..a5235b2bf66a4a900e296e8d42f1579925e5697f 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -272,11 +272,11 @@ In SparkR, we support several kinds of User-Defined Functions: ##### dapply Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` -and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function -should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match the R function's output. +and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match to [data types](#data-type-mapping-between-r-and-spark) of returned value. <div data-lang="r" markdown="1"> {% highlight r %} + # Convert waiting time from hours to seconds. # Note that we can apply UDF to DataFrame. schema <- structType(structField("eruptions", "double"), structField("waiting", "double"), @@ -295,8 +295,8 @@ head(collect(df1)) ##### dapplyCollect Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function -should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the -output of UDF run on all the partitions can fit in driver memory. +should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory. + <div data-lang="r" markdown="1"> {% highlight r %} @@ -316,6 +316,136 @@ head(ldf, 3) {% endhighlight %} </div> +#### Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +##### gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below is the data type mapping between R +and Spark. + +#### Data type mapping between R and Spark +<table class="table"> +<tr><th>R</th><th>Spark</th></tr> +<tr> + <td>byte</td> + <td>byte</td> +</tr> +<tr> + <td>integer</td> + <td>integer</td> +</tr> +<tr> + <td>float</td> + <td>float</td> +</tr> +<tr> + <td>double</td> + <td>double</td> +</tr> +<tr> + <td>numeric</td> + <td>double</td> +</tr> +<tr> + <td>character</td> + <td>string</td> +</tr> +<tr> + <td>string</td> + <td>string</td> +</tr> +<tr> + <td>binary</td> + <td>binary</td> +</tr> +<tr> + <td>raw</td> + <td>binary</td> +</tr> +<tr> + <td>logical</td> + <td>boolean</td> +</tr> +<tr> + <td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html">POSIXct</a></td> + <td>timestamp</td> +</tr> +<tr> + <td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html">POSIXlt</a></td> + <td>timestamp</td> +</tr> +<tr> + <td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/Dates.html">Date</a></td> + <td>date</td> +</tr> +<tr> + <td>array</td> + <td>array</td> +</tr> +<tr> + <td>list</td> + <td>array</td> +</tr> +<tr> + <td>env</td> + <td>map</td> +</tr> +</table> + +<div data-lang="r" markdown="1"> +{% highlight r %} + +# Determine six waiting times with the largest eruption time in minutes. +schema <- structType(structField("waiting", "double"), structField("max_eruption", "double")) +result <- gapply( + df, + "waiting", + function(key, x) { + y <- data.frame(key, max(x$eruptions)) + }, + schema) +head(collect(arrange(result, "max_eruption", decreasing = TRUE))) + +## waiting max_eruption +##1 64 5.100 +##2 69 5.067 +##3 71 5.033 +##4 87 5.000 +##5 63 4.933 +##6 89 4.900 +{% endhighlight %} +</div> + +##### gapplyCollect +Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory. + +<div data-lang="r" markdown="1"> +{% highlight r %} + +# Determine six waiting times with the largest eruption time in minutes. +result <- gapplyCollect( + df, + "waiting", + function(key, x) { + y <- data.frame(key, max(x$eruptions)) + colnames(y) <- c("waiting", "max_eruption") + y + }) +head(result[order(result$max_eruption, decreasing = TRUE), ]) + +## waiting max_eruption +##1 64 5.100 +##2 69 5.067 +##3 71 5.033 +##4 87 5.000 +##5 63 4.933 +##6 89 4.900 + +{% endhighlight %} +</div> + #### Run local R functions distributed using `spark.lapply` ##### spark.lapply