From c133907c5d9a6e6411b896b5e0cff48b2beff09f Mon Sep 17 00:00:00 2001
From: Yanbo Liang <ybliang8@gmail.com>
Date: Wed, 21 Sep 2016 20:08:28 -0700
Subject: [PATCH] [SPARK-17577][SPARKR][CORE] SparkR support add files to Spark
 job and get by executors

## What changes were proposed in this pull request?
Scala/Python users can add files to Spark job by submit options ```--files``` or ```SparkContext.addFile()```. Meanwhile, users can get the added file by ```SparkFiles.get(filename)```.
We should also support this function for SparkR users, since they also have the requirements for some shared dependency files. For example, SparkR users can download third party R packages to driver firstly, add these files to the Spark job as dependency by this API and then each executor can install these packages by ```install.packages```.

## How was this patch tested?
Add unit test.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #15131 from yanboliang/spark-17577.
---
 R/pkg/NAMESPACE                               |  3 ++
 R/pkg/R/context.R                             | 48 +++++++++++++++++++
 R/pkg/inst/tests/testthat/test_context.R      | 13 +++++
 .../scala/org/apache/spark/SparkContext.scala |  6 +--
 4 files changed, 67 insertions(+), 3 deletions(-)

diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index a5e9cbdc37..267a38c215 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -336,6 +336,9 @@ export("as.DataFrame",
        "read.parquet",
        "read.text",
        "spark.lapply",
+       "spark.addFile",
+       "spark.getSparkFilesRootDirectory",
+       "spark.getSparkFiles",
        "sql",
        "str",
        "tableToDF",
diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R
index 13ade49eab..4793578ad6 100644
--- a/R/pkg/R/context.R
+++ b/R/pkg/R/context.R
@@ -225,6 +225,54 @@ setCheckpointDir <- function(sc, dirName) {
   invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
 }
 
+#' Add a file or directory to be downloaded with this Spark job on every node.
+#'
+#' The path passed can be either a local file, a file in HDFS (or other Hadoop-supported
+#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
+#' use spark.getSparkFiles(fileName) to find its download location.
+#'
+#' @rdname spark.addFile
+#' @param path The path of the file to be added
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.addFile("~/myfile")
+#'}
+#' @note spark.addFile since 2.1.0
+spark.addFile <- function(path) {
+  sc <- getSparkContext()
+  invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path))))
+}
+
+#' Get the root directory that contains files added through spark.addFile.
+#'
+#' @rdname spark.getSparkFilesRootDirectory
+#' @return the root directory that contains files added through spark.addFile
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.getSparkFilesRootDirectory()
+#'}
+#' @note spark.getSparkFilesRootDirectory since 2.1.0
+spark.getSparkFilesRootDirectory <- function() {
+  callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
+}
+
+#' Get the absolute path of a file added through spark.addFile.
+#'
+#' @rdname spark.getSparkFiles
+#' @param fileName The name of the file added through spark.addFile
+#' @return the absolute path of a file added through spark.addFile.
+#' @export
+#' @examples
+#'\dontrun{
+#' spark.getSparkFiles("myfile")
+#'}
+#' @note spark.getSparkFiles since 2.1.0
+spark.getSparkFiles <- function(fileName) {
+  callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
+}
+
 #' Run a function over a list of elements, distributing the computations with Spark
 #'
 #' Run a function over a list of elements, distributing the computations with Spark. Applies a
diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R
index 1ab7f319df..0495418bb7 100644
--- a/R/pkg/inst/tests/testthat/test_context.R
+++ b/R/pkg/inst/tests/testthat/test_context.R
@@ -166,3 +166,16 @@ test_that("spark.lapply should perform simple transforms", {
   expect_equal(doubled, as.list(2 * 1:10))
   sparkR.session.stop()
 })
+
+test_that("add and get file to be downloaded with Spark job on every node", {
+  sparkR.sparkContext()
+  path <- tempfile(pattern = "hello", fileext = ".txt")
+  filename <- basename(path)
+  words <- "Hello World!"
+  writeLines(words, path)
+  spark.addFile(path)
+  download_path <- spark.getSparkFiles(filename)
+  expect_equal(readLines(download_path), words)
+  unlink(path)
+  sparkR.session.stop()
+})
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index db84172e16..1981ad5671 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1427,7 +1427,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * supported for Hadoop-supported filesystems.
    */
   def addFile(path: String, recursive: Boolean): Unit = {
-    val uri = new URI(path)
+    val uri = new Path(path).toUri
     val schemeCorrectedPath = uri.getScheme match {
       case null | "local" => new File(path).getCanonicalFile.toURI.toString
       case _ => path
@@ -1458,8 +1458,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
       logInfo(s"Added file $path at $key with timestamp $timestamp")
       // Fetch the file locally so that closures which are run on the driver can still use the
       // SparkFiles API to access files.
-      Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
-        hadoopConfiguration, timestamp, useCache = false)
+      Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf,
+        env.securityManager, hadoopConfiguration, timestamp, useCache = false)
       postEnvironmentUpdate()
     }
   }
-- 
GitLab