Skip to content
Snippets Groups Projects
Commit 277ed375 authored by Yanbo Liang's avatar Yanbo Liang
Browse files

[SPARK-19925][SPARKR] Fix SparkR spark.getSparkFiles fails when it was called on executors.

## What changes were proposed in this pull request?
SparkR ```spark.getSparkFiles``` fails when it was called on executors, see details at [SPARK-19925](https://issues.apache.org/jira/browse/SPARK-19925

).

## How was this patch tested?
Add unit tests, and verify this fix at standalone and yarn cluster.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #17274 from yanboliang/spark-19925.

(cherry picked from commit 478fbc86)
Signed-off-by: default avatarYanbo Liang <ybliang8@gmail.com>
parent c4d2b833
No related branches found
No related tags found
No related merge requests found
......@@ -330,7 +330,13 @@ spark.addFile <- function(path, recursive = FALSE) {
#'}
#' @note spark.getSparkFilesRootDirectory since 2.1.0
spark.getSparkFilesRootDirectory <- function() {
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
# Running on driver.
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
} else {
# Running on worker.
Sys.getenv("SPARKR_SPARKFILES_ROOT_DIR")
}
}
#' Get the absolute path of a file added through spark.addFile.
......@@ -345,7 +351,13 @@ spark.getSparkFilesRootDirectory <- function() {
#'}
#' @note spark.getSparkFiles since 2.1.0
spark.getSparkFiles <- function(fileName) {
callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
# Running on driver.
callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
} else {
# Running on worker.
file.path(spark.getSparkFilesRootDirectory(), as.character(fileName))
}
}
#' Run a function over a list of elements, distributing the computations with Spark
......
......@@ -177,6 +177,13 @@ test_that("add and get file to be downloaded with Spark job on every node", {
spark.addFile(path)
download_path <- spark.getSparkFiles(filename)
expect_equal(readLines(download_path), words)
# Test spark.getSparkFiles works well on executors.
seq <- seq(from = 1, to = 10, length.out = 5)
f <- function(seq) { spark.getSparkFiles(filename) }
results <- spark.lapply(seq, f)
for (i in 1:5) { expect_equal(basename(results[[i]]), filename) }
unlink(path)
# Test add directory recursively.
......
......@@ -347,6 +347,8 @@ private[r] object RRunner {
pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(","))
pb.environment().put("SPARKR_WORKER_PORT", port.toString)
pb.environment().put("SPARKR_BACKEND_CONNECTION_TIMEOUT", rConnectionTimeout.toString)
pb.environment().put("SPARKR_SPARKFILES_ROOT_DIR", SparkFiles.getRootDirectory())
pb.environment().put("SPARKR_IS_RUNNING_ON_WORKER", "TRUE")
pb.redirectErrorStream(true) // redirect stderr into stdout
val proc = pb.start()
val errThread = startStdoutThread(proc)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment