diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 725cbf24f236d90cc40c559187b69709a1d97a84..f856979c2a814d688b2657bebf106c040454900e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -55,7 +55,6 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) { .Object }) -#' @rdname SparkDataFrame #' @export #' @param sdf A Java object reference to the backing Scala DataFrame #' @param isCached TRUE if the SparkDataFrame is cached diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index dd0ceaeb08a5d34f8461a269540b9804468a32be..2538bb25073e16013862d0b0c8eeb78c7997301f 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -264,10 +264,7 @@ setCheckpointDir <- function(sc, dirName) { #'} #' @note spark.lapply since 2.0.0 spark.lapply <- function(list, func) { - if (!exists(".sparkRjsc", envir = .sparkREnv)) { - stop("SparkR has not been initialized. Please call sparkR.session()") - } - sc <- get(".sparkRjsc", envir = .sparkREnv) + sc <- getSparkContext() rdd <- parallelize(sc, list, length(list)) results <- map(rdd, func) local <- collect(results) @@ -287,9 +284,6 @@ spark.lapply <- function(list, func) { #'} #' @note setLogLevel since 2.0.0 setLogLevel <- function(level) { - if (!exists(".sparkRjsc", envir = .sparkREnv)) { - stop("SparkR has not been initialized. Please call sparkR.session()") - } - sc <- get(".sparkRjsc", envir = .sparkREnv) + sc <- getSparkContext() callJMethod(sc, "setLogLevel", level) } diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 2b6e124151397725338e7c29f6cc76b55c40ba50..62659b0c0ce5fec69d0cc4c40692cd099451c9d0 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -392,47 +392,91 @@ sparkR.session <- function( #' Assigns a group ID to all the jobs started by this thread until the group ID is set to a #' different value or cleared. #' -#' @param sc existing spark context #' @param groupid the ID to be assigned to job groups #' @param description description for the job group ID #' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation +#' @rdname setJobGroup +#' @name setJobGroup #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' setJobGroup(sc, "myJobGroup", "My job group description", TRUE) +#' sparkR.session() +#' setJobGroup("myJobGroup", "My job group description", TRUE) #'} #' @note setJobGroup since 1.5.0 -setJobGroup <- function(sc, groupId, description, interruptOnCancel) { +#' @method setJobGroup default +setJobGroup.default <- function(groupId, description, interruptOnCancel) { + sc <- getSparkContext() callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel) } +setJobGroup <- function(sc, groupId, description, interruptOnCancel) { + if (class(sc) == "jobj" && any(grepl("JavaSparkContext", getClassName.jobj(sc)))) { + .Deprecated("setJobGroup(groupId, description, interruptOnCancel)", + old = "setJobGroup(sc, groupId, description, interruptOnCancel)") + setJobGroup.default(groupId, description, interruptOnCancel) + } else { + # Parameter order is shifted + groupIdToUse <- sc + descriptionToUse <- groupId + interruptOnCancelToUse <- description + setJobGroup.default(groupIdToUse, descriptionToUse, interruptOnCancelToUse) + } +} + #' Clear current job group ID and its description #' -#' @param sc existing spark context +#' @rdname clearJobGroup +#' @name clearJobGroup #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' clearJobGroup(sc) +#' sparkR.session() +#' clearJobGroup() #'} #' @note clearJobGroup since 1.5.0 -clearJobGroup <- function(sc) { +#' @method clearJobGroup default +clearJobGroup.default <- function() { + sc <- getSparkContext() callJMethod(sc, "clearJobGroup") } +clearJobGroup <- function(sc) { + if (!missing(sc) && + class(sc) == "jobj" && + any(grepl("JavaSparkContext", getClassName.jobj(sc)))) { + .Deprecated("clearJobGroup()", old = "clearJobGroup(sc)") + } + clearJobGroup.default() +} + + #' Cancel active jobs for the specified group #' -#' @param sc existing spark context #' @param groupId the ID of job group to be cancelled +#' @rdname cancelJobGroup +#' @name cancelJobGroup #' @examples #'\dontrun{ -#' sc <- sparkR.init() -#' cancelJobGroup(sc, "myJobGroup") +#' sparkR.session() +#' cancelJobGroup("myJobGroup") #'} #' @note cancelJobGroup since 1.5.0 -cancelJobGroup <- function(sc, groupId) { +#' @method cancelJobGroup default +cancelJobGroup.default <- function(groupId) { + sc <- getSparkContext() callJMethod(sc, "cancelJobGroup", groupId) } +cancelJobGroup <- function(sc, groupId) { + if (class(sc) == "jobj" && any(grepl("JavaSparkContext", getClassName.jobj(sc)))) { + .Deprecated("cancelJobGroup(groupId)", old = "cancelJobGroup(sc, groupId)") + cancelJobGroup.default(groupId) + } else { + # Parameter order is shifted + groupIdToUse <- sc + cancelJobGroup.default(groupIdToUse) + } +} + sparkConfToSubmitOps <- new.env() sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory" sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path" diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index d5c062d3bcc7d460e7171c514c9297a0fd14a9a2..e75bfbf037fbbcdb0a0eb62f51e9436347e61ae8 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -685,3 +685,11 @@ launchScript <- function(script, combinedArgs, capture = FALSE) { system2(script, combinedArgs, wait = capture, stdout = capture) } } + +getSparkContext <- function() { + if (!exists(".sparkRjsc", envir = .sparkREnv)) { + stop("SparkR has not been initialized. Please call sparkR.session()") + } + sc <- get(".sparkRjsc", envir = .sparkREnv) + sc +} diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 3d232df566a83818ac4d3f77ac1469b617269dc8..2a1bd61b11118d2668d1da5a47f4a17c6ff7bf25 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -100,9 +100,13 @@ test_that("rdd GC across sparkR.stop", { test_that("job group functions can be called", { sc <- sparkR.sparkContext() - setJobGroup(sc, "groupId", "job description", TRUE) - cancelJobGroup(sc, "groupId") - clearJobGroup(sc) + setJobGroup("groupId", "job description", TRUE) + cancelJobGroup("groupId") + clearJobGroup() + + suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE)) + suppressWarnings(cancelJobGroup(sc, "groupId")) + suppressWarnings(clearJobGroup(sc)) sparkR.session.stop() }) diff --git a/docs/sparkr.md b/docs/sparkr.md index 9e74e4a96acdcc246f92c8796dab436cea35ddd3..32ef815eb11c45b1939f914ab21b81524c0ffeb2 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -428,3 +428,5 @@ You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-ma - The `sqlContext` parameter is no longer required for these functions: `createDataFrame`, `as.DataFrame`, `read.json`, `jsonFile`, `read.parquet`, `parquetFile`, `read.text`, `sql`, `tables`, `tableNames`, `cacheTable`, `uncacheTable`, `clearCache`, `dropTempTable`, `read.df`, `loadDF`, `createExternalTable`. - The method `registerTempTable` has been deprecated to be replaced by `createOrReplaceTempView`. - The method `dropTempTable` has been deprecated to be replaced by `dropTempView`. + - The `sc` SparkContext parameter is no longer required for these functions: `setJobGroup`, `clearJobGroup`, `cancelJobGroup` +