Skip to content
Snippets Groups Projects
Commit 46d98e0a authored by Felix Cheung's avatar Felix Cheung Committed by Shivaram Venkataraman
Browse files

[SPARK-16028][SPARKR] spark.lapply can work with active context

## What changes were proposed in this pull request?

spark.lapply and setLogLevel

## How was this patch tested?

unit test

shivaram thunterdb

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #13752 from felixcheung/rlapply.
parent c44bf137
No related branches found
No related tags found
No related merge requests found
...@@ -252,17 +252,20 @@ setCheckpointDir <- function(sc, dirName) { ...@@ -252,17 +252,20 @@ setCheckpointDir <- function(sc, dirName) {
#' } #' }
#' #'
#' @rdname spark.lapply #' @rdname spark.lapply
#' @param sc Spark Context to use
#' @param list the list of elements #' @param list the list of elements
#' @param func a function that takes one argument. #' @param func a function that takes one argument.
#' @return a list of results (the exact type being determined by the function) #' @return a list of results (the exact type being determined by the function)
#' @export #' @export
#' @examples #' @examples
#'\dontrun{ #'\dontrun{
#' sc <- sparkR.init() #' sparkR.session()
#' doubled <- spark.lapply(sc, 1:10, function(x){2 * x}) #' doubled <- spark.lapply(1:10, function(x){2 * x})
#'} #'}
spark.lapply <- function(sc, list, func) { spark.lapply <- function(list, func) {
if (!exists(".sparkRjsc", envir = .sparkREnv)) {
stop("SparkR has not been initialized. Please call sparkR.session()")
}
sc <- get(".sparkRjsc", envir = .sparkREnv)
rdd <- parallelize(sc, list, length(list)) rdd <- parallelize(sc, list, length(list))
results <- map(rdd, func) results <- map(rdd, func)
local <- collect(results) local <- collect(results)
...@@ -274,14 +277,17 @@ spark.lapply <- function(sc, list, func) { ...@@ -274,14 +277,17 @@ spark.lapply <- function(sc, list, func) {
#' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN" #' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"
#' #'
#' @rdname setLogLevel #' @rdname setLogLevel
#' @param sc Spark Context to use
#' @param level New log level #' @param level New log level
#' @export #' @export
#' @examples #' @examples
#'\dontrun{ #'\dontrun{
#' setLogLevel(sc, "ERROR") #' setLogLevel("ERROR")
#'} #'}
setLogLevel <- function(sc, level) { setLogLevel <- function(level) {
if (!exists(".sparkRjsc", envir = .sparkREnv)) {
stop("SparkR has not been initialized. Please call sparkR.session()")
}
sc <- get(".sparkRjsc", envir = .sparkREnv)
callJMethod(sc, "setLogLevel", level) callJMethod(sc, "setLogLevel", level)
} }
...@@ -107,8 +107,8 @@ test_that("job group functions can be called", { ...@@ -107,8 +107,8 @@ test_that("job group functions can be called", {
}) })
test_that("utility function can be called", { test_that("utility function can be called", {
sc <- sparkR.sparkContext() sparkR.sparkContext()
setLogLevel(sc, "ERROR") setLogLevel("ERROR")
sparkR.session.stop() sparkR.session.stop()
}) })
...@@ -161,7 +161,7 @@ test_that("sparkJars sparkPackages as comma-separated strings", { ...@@ -161,7 +161,7 @@ test_that("sparkJars sparkPackages as comma-separated strings", {
test_that("spark.lapply should perform simple transforms", { test_that("spark.lapply should perform simple transforms", {
sc <- sparkR.sparkContext() sc <- sparkR.sparkContext()
doubled <- spark.lapply(sc, 1:10, function(x) { 2 * x }) doubled <- spark.lapply(1:10, function(x) { 2 * x })
expect_equal(doubled, as.list(2 * 1:10)) expect_equal(doubled, as.list(2 * 1:10))
sparkR.session.stop() sparkR.session.stop()
}) })
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment