diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index ea31baed3d97e91eaf46f835090f751d028d5bc9..002e469efba86bec6441efb0de268b7d7e8bdb19 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -295,6 +295,7 @@ export("as.DataFrame", "read.json", "read.parquet", "read.text", + "spark.lapply", "sql", "str", "tableToDF", diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 4105a6e5c825c54e7781d4d9f08ec5bfec58b04a..44bca877fd45af59aa0cf9e5d5afa45034027736 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -226,6 +226,48 @@ setCheckpointDir <- function(sc, dirName) { invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName)))) } +#' @title Run a function over a list of elements, distributing the computations with Spark. +#' +#' @description +#' Applies a function in a manner that is similar to doParallel or lapply to elements of a list. +#' The computations are distributed using Spark. It is conceptually the same as the following code: +#' lapply(list, func) +#' +#' Known limitations: +#' - variable scoping and capture: compared to R's rich support for variable resolutions, the +# distributed nature of SparkR limits how variables are resolved at runtime. All the variables +# that are available through lexical scoping are embedded in the closure of the function and +# available as read-only variables within the function. The environment variables should be +# stored into temporary variables outside the function, and not directly accessed within the +# function. +#' +#' - loading external packages: In order to use a package, you need to load it inside the +#' closure. For example, if you rely on the MASS module, here is how you would use it: +#'\dontrun{ +#' train <- function(hyperparam) { +#' library(MASS) +#' lm.ridge(“y ~ x+zâ€, data, lambda=hyperparam) +#' model +#' } +#'} +#' +#' @rdname spark.lapply +#' @param sc Spark Context to use +#' @param list the list of elements +#' @param func a function that takes one argument. +#' @return a list of results (the exact type being determined by the function) +#' @export +#' @examples +#'\dontrun{ +#' doubled <- spark.lapply(1:10, function(x){2 * x}) +#'} +spark.lapply <- function(sc, list, func) { + rdd <- parallelize(sc, list, length(list)) + results <- map(rdd, func) + local <- collect(results) + local +} + #' Set new log level #' #' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN" diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index ffa067eb5ea167b2e14425a3430c7e06850873e3..ca04342cd5124b048e300220a1252ae45fecdd8a 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -141,3 +141,9 @@ test_that("sparkJars sparkPackages as comma-separated strings", { expect_that(processSparkJars(f), not(gives_warning())) expect_match(processSparkJars(f), f) }) + +test_that("spark.lapply should perform simple transforms", { + sc <- sparkR.init() + doubled <- spark.lapply(sc, 1:10, function(x) { 2 * x }) + expect_equal(doubled, as.list(2 * 1:10)) +})