From 30b182bcc088aef161585211c517f473b9ee6632 Mon Sep 17 00:00:00 2001
From: Felix Cheung <felixcheung_m@hotmail.com>
Date: Sun, 26 Jun 2016 13:10:43 -0700
Subject: [PATCH] [SPARK-16184][SPARKR] conf API for SparkSession

## What changes were proposed in this pull request?

Add `conf` method to get Runtime Config from SparkSession

## How was this patch tested?

unit tests, manual tests

This is how it works in sparkR shell:
```
 SparkSession available as 'spark'.
> conf()
$hive.metastore.warehouse.dir
[1] "file:/opt/spark-2.0.0-bin-hadoop2.6/R/spark-warehouse"

$spark.app.id
[1] "local-1466749575523"

$spark.app.name
[1] "SparkR"

$spark.driver.host
[1] "10.0.2.1"

$spark.driver.port
[1] "45629"

$spark.executorEnv.LD_LIBRARY_PATH
[1] "$LD_LIBRARY_PATH:/usr/lib/R/lib:/usr/lib/x86_64-linux-gnu:/usr/lib/jvm/default-java/jre/lib/amd64/server"

$spark.executor.id
[1] "driver"

$spark.home
[1] "/opt/spark-2.0.0-bin-hadoop2.6"

$spark.master
[1] "local[*]"

$spark.sql.catalogImplementation
[1] "hive"

$spark.submit.deployMode
[1] "client"

> conf("spark.master")
$spark.master
[1] "local[*]"

```

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #13885 from felixcheung/rconf.
---
 R/pkg/NAMESPACE                               |  1 +
 R/pkg/R/SQLContext.R                          | 50 +++++++++++++++++--
 R/pkg/inst/tests/testthat/test_sparkSQL.R     | 12 ++---
 .../org/apache/spark/sql/api/r/SQLUtils.scala |  4 ++
 4 files changed, 57 insertions(+), 10 deletions(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 2272d8bdd5..e0ffde922d 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -10,6 +10,7 @@ export("sparkR.session")
 export("sparkR.init")
 export("sparkR.stop")
 export("sparkR.session.stop")
+export("sparkR.conf")
 export("print.jobj")
 
 export("sparkRSQL.init",
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index ee3a41cacb..8df73db36e 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -110,11 +110,53 @@ infer_type <- function(x) {
   }
 }
 
-getDefaultSqlSource <- function() {
+#' Get Runtime Config from the current active SparkSession
+#'
+#' Get Runtime Config from the current active SparkSession.
+#' To change SparkSession Runtime Config, please see `sparkR.session()`.
+#'
+#' @param key (optional) The key of the config to get, if omitted, all config is returned
+#' @param defaultValue (optional) The default value of the config to return if they config is not
+#' set, if omitted, the call fails if the config key is not set
+#' @return a list of config values with keys as their names
+#' @rdname sparkR.conf
+#' @name sparkR.conf
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' allConfigs <- sparkR.conf()
+#' masterValue <- unlist(sparkR.conf("spark.master"))
+#' namedConfig <- sparkR.conf("spark.executor.memory", "0g")
+#' }
+#' @note sparkR.conf since 2.0.0
+sparkR.conf <- function(key, defaultValue) {
   sparkSession <- getSparkSession()
-  conf <- callJMethod(sparkSession, "conf")
-  source <- callJMethod(conf, "get", "spark.sql.sources.default", "org.apache.spark.sql.parquet")
-  source
+  if (missing(key)) {
+    m <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSessionConf", sparkSession)
+    as.list(m, all.names = TRUE, sorted = TRUE)
+  } else {
+    conf <- callJMethod(sparkSession, "conf")
+    value <- if (missing(defaultValue)) {
+      tryCatch(callJMethod(conf, "get", key),
+              error = function(e) {
+                if (any(grep("java.util.NoSuchElementException", as.character(e)))) {
+                  stop(paste0("Config '", key, "' is not set"))
+                } else {
+                  stop(paste0("Unknown error: ", as.character(e)))
+                }
+              })
+    } else {
+      callJMethod(conf, "get", key, defaultValue)
+    }
+    l <- setNames(list(value), key)
+    l
+  }
+}
+
+getDefaultSqlSource <- function() {
+  l <- sparkR.conf("spark.sql.sources.default", "org.apache.spark.sql.parquet")
+  l[["spark.sql.sources.default"]]
 }
 
 #' Create a SparkDataFrame
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9378c7afac..74def5ce42 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2365,7 +2365,7 @@ test_that("randomSplit", {
   expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 })))
 })
 
-test_that("Change config on SparkSession", {
+test_that("Setting and getting config on SparkSession", {
   # first, set it to a random but known value
   conf <- callJMethod(sparkSession, "conf")
   property <- paste0("spark.testing.", as.character(runif(1)))
@@ -2378,17 +2378,17 @@ test_that("Change config on SparkSession", {
   names(l) <- property
   sparkR.session(sparkConfig = l)
 
-  conf <- callJMethod(sparkSession, "conf")
-  newValue <- callJMethod(conf, "get", property, "")
+  newValue <- unlist(sparkR.conf(property, ""), use.names = FALSE)
   expect_equal(value2, newValue)
 
   value <- as.character(runif(1))
   sparkR.session(spark.app.name = "sparkSession test", spark.testing.r.session.r = value)
-  conf <- callJMethod(sparkSession, "conf")
-  appNameValue <- callJMethod(conf, "get", "spark.app.name", "")
-  testValue <- callJMethod(conf, "get", "spark.testing.r.session.r", "")
+  allconf <- sparkR.conf()
+  appNameValue <- allconf[["spark.app.name"]]
+  testValue <- allconf[["spark.testing.r.session.r"]]
   expect_equal(appNameValue, "sparkSession test")
   expect_equal(testValue, value)
+  expect_error(sparkR.conf("completely.dummy"), "Config 'completely.dummy' is not set")
 })
 
 test_that("enableHiveSupport on SparkSession", {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index 0a995d2e9d..7d8ea03a27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -71,6 +71,10 @@ private[sql] object SQLUtils extends Logging {
     }
   }
 
+  def getSessionConf(spark: SparkSession): JMap[String, String] = {
+    spark.conf.getAll.asJava
+  }
+
   def getJavaSparkContext(spark: SparkSession): JavaSparkContext = {
     new JavaSparkContext(spark.sparkContext)
   }
-- 
GitLab