From 478fbc866fbfdb4439788583281863ecea14e8af Mon Sep 17 00:00:00 2001
From: Yanbo Liang <ybliang8@gmail.com>
Date: Tue, 21 Mar 2017 21:50:54 -0700
Subject: [PATCH] [SPARK-19925][SPARKR] Fix SparkR spark.getSparkFiles fails
 when it was called on executors.

## What changes were proposed in this pull request?
SparkR ```spark.getSparkFiles``` fails when it was called on executors, see details at [SPARK-19925](https://issues.apache.org/jira/browse/SPARK-19925).

## How was this patch tested?
Add unit tests, and verify this fix at standalone and yarn cluster.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #17274 from yanboliang/spark-19925.
---
 R/pkg/R/context.R                                | 16 ++++++++++++++--
 R/pkg/inst/tests/testthat/test_context.R         |  7 +++++++
 .../scala/org/apache/spark/api/r/RRunner.scala   |  2 ++
 3 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 1ca573e5bd..50856e3d98 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -330,7 +330,13 @@ spark.addFile <- function(path, recursive = FALSE) {
 #'}
 #' @note spark.getSparkFilesRootDirectory since 2.1.0
 spark.getSparkFilesRootDirectory <- function() {
-  callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
+  if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
+    # Running on driver.
+    callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
+  } else {
+    # Running on worker.
+    Sys.getenv("SPARKR_SPARKFILES_ROOT_DIR")
+  }
 }
 
 #' Get the absolute path of a file added through spark.addFile.
@@ -345,7 +351,13 @@ spark.getSparkFilesRootDirectory <- function() {
 #'}
 #' @note spark.getSparkFiles since 2.1.0
 spark.getSparkFiles <- function(fileName) {
-  callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
+  if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
+    # Running on driver.
+    callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
+  } else {
+    # Running on worker.
+    file.path(spark.getSparkFilesRootDirectory(), as.character(fileName))
+  }
 }
 
 #' Run a function over a list of elements, distributing the computations with Spark
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index caca069339..c847113491 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -177,6 +177,13 @@ test_that("add and get file to be downloaded with Spark job on every node", {
   spark.addFile(path)
   download_path <- spark.getSparkFiles(filename)
   expect_equal(readLines(download_path), words)
+
+  # Test spark.getSparkFiles works well on executors.
+  seq <- seq(from = 1, to = 10, length.out = 5)
+  f <- function(seq) { spark.getSparkFiles(filename) }
+  results <- spark.lapply(seq, f)
+  for (i in 1:5) { expect_equal(basename(results[[i]]), filename) }
+
   unlink(path)
 
   # Test add directory recursively.
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
index 29e21b3b1a..8811839200 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
@@ -347,6 +347,8 @@ private[r] object RRunner {
     pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(","))
     pb.environment().put("SPARKR_WORKER_PORT", port.toString)
     pb.environment().put("SPARKR_BACKEND_CONNECTION_TIMEOUT", rConnectionTimeout.toString)
+    pb.environment().put("SPARKR_SPARKFILES_ROOT_DIR", SparkFiles.getRootDirectory())
+    pb.environment().put("SPARKR_IS_RUNNING_ON_WORKER", "TRUE")
     pb.redirectErrorStream(true)  // redirect stderr into stdout
     val proc = pb.start()
     val errThread = startStdoutThread(proc)
-- 
GitLab