diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 2272d8bdd52c2361c3797dbe5ef6e9d991068a72..e0ffde922dacf99ac12fc203ad3f8aa26b4a449f 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -10,6 +10,7 @@ export("sparkR.session") export("sparkR.init") export("sparkR.stop") export("sparkR.session.stop") +export("sparkR.conf") export("print.jobj") export("sparkRSQL.init", diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index ee3a41cacbee61a760c4a9309416266f54712893..8df73db36e956788453e1be8c2132413a63594f4 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -110,11 +110,53 @@ infer_type <- function(x) { } } -getDefaultSqlSource <- function() { +#' Get Runtime Config from the current active SparkSession +#' +#' Get Runtime Config from the current active SparkSession. +#' To change SparkSession Runtime Config, please see `sparkR.session()`. +#' +#' @param key (optional) The key of the config to get, if omitted, all config is returned +#' @param defaultValue (optional) The default value of the config to return if they config is not +#' set, if omitted, the call fails if the config key is not set +#' @return a list of config values with keys as their names +#' @rdname sparkR.conf +#' @name sparkR.conf +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' allConfigs <- sparkR.conf() +#' masterValue <- unlist(sparkR.conf("spark.master")) +#' namedConfig <- sparkR.conf("spark.executor.memory", "0g") +#' } +#' @note sparkR.conf since 2.0.0 +sparkR.conf <- function(key, defaultValue) { sparkSession <- getSparkSession() - conf <- callJMethod(sparkSession, "conf") - source <- callJMethod(conf, "get", "spark.sql.sources.default", "org.apache.spark.sql.parquet") - source + if (missing(key)) { + m <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSessionConf", sparkSession) + as.list(m, all.names = TRUE, sorted = TRUE) + } else { + conf <- callJMethod(sparkSession, "conf") + value <- if (missing(defaultValue)) { + tryCatch(callJMethod(conf, "get", key), + error = function(e) { + if (any(grep("java.util.NoSuchElementException", as.character(e)))) { + stop(paste0("Config '", key, "' is not set")) + } else { + stop(paste0("Unknown error: ", as.character(e))) + } + }) + } else { + callJMethod(conf, "get", key, defaultValue) + } + l <- setNames(list(value), key) + l + } +} + +getDefaultSqlSource <- function() { + l <- sparkR.conf("spark.sql.sources.default", "org.apache.spark.sql.parquet") + l[["spark.sql.sources.default"]] } #' Create a SparkDataFrame diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 9378c7afac8bd1e7f1eb5cf0e2a31f4dbc6d8111..74def5ce4245dab7af1148e37bc9bd08ee64e24c 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2365,7 +2365,7 @@ test_that("randomSplit", { expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 }))) }) -test_that("Change config on SparkSession", { +test_that("Setting and getting config on SparkSession", { # first, set it to a random but known value conf <- callJMethod(sparkSession, "conf") property <- paste0("spark.testing.", as.character(runif(1))) @@ -2378,17 +2378,17 @@ test_that("Change config on SparkSession", { names(l) <- property sparkR.session(sparkConfig = l) - conf <- callJMethod(sparkSession, "conf") - newValue <- callJMethod(conf, "get", property, "") + newValue <- unlist(sparkR.conf(property, ""), use.names = FALSE) expect_equal(value2, newValue) value <- as.character(runif(1)) sparkR.session(spark.app.name = "sparkSession test", spark.testing.r.session.r = value) - conf <- callJMethod(sparkSession, "conf") - appNameValue <- callJMethod(conf, "get", "spark.app.name", "") - testValue <- callJMethod(conf, "get", "spark.testing.r.session.r", "") + allconf <- sparkR.conf() + appNameValue <- allconf[["spark.app.name"]] + testValue <- allconf[["spark.testing.r.session.r"]] expect_equal(appNameValue, "sparkSession test") expect_equal(testValue, value) + expect_error(sparkR.conf("completely.dummy"), "Config 'completely.dummy' is not set") }) test_that("enableHiveSupport on SparkSession", { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 0a995d2e9d180ecf4dae4783dfa39adbefed5709..7d8ea03a2791025adcb542b7f8fbee297d0773db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -71,6 +71,10 @@ private[sql] object SQLUtils extends Logging { } } + def getSessionConf(spark: SparkSession): JMap[String, String] = { + spark.conf.getAll.asJava + } + def getJavaSparkContext(spark: SparkSession): JavaSparkContext = { new JavaSparkContext(spark.sparkContext) }