diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index a5e9cbdc37f06a3f4767c8e0662d0dc67fa1ef13..267a38c21530b8fa349f2b4118862d3cf13568d6 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -336,6 +336,9 @@ export("as.DataFrame", "read.parquet", "read.text", "spark.lapply", + "spark.addFile", + "spark.getSparkFilesRootDirectory", + "spark.getSparkFiles", "sql", "str", "tableToDF", diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 13ade49eabfa642c48be60501544d92eb1b1744c..4793578ad684eb7a526f0936b282552cfdf2e7f1 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -225,6 +225,54 @@ setCheckpointDir <- function(sc, dirName) { invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName)))) } +#' Add a file or directory to be downloaded with this Spark job on every node. +#' +#' The path passed can be either a local file, a file in HDFS (or other Hadoop-supported +#' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, +#' use spark.getSparkFiles(fileName) to find its download location. +#' +#' @rdname spark.addFile +#' @param path The path of the file to be added +#' @export +#' @examples +#'\dontrun{ +#' spark.addFile("~/myfile") +#'} +#' @note spark.addFile since 2.1.0 +spark.addFile <- function(path) { + sc <- getSparkContext() + invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)))) +} + +#' Get the root directory that contains files added through spark.addFile. +#' +#' @rdname spark.getSparkFilesRootDirectory +#' @return the root directory that contains files added through spark.addFile +#' @export +#' @examples +#'\dontrun{ +#' spark.getSparkFilesRootDirectory() +#'} +#' @note spark.getSparkFilesRootDirectory since 2.1.0 +spark.getSparkFilesRootDirectory <- function() { + callJStatic("org.apache.spark.SparkFiles", "getRootDirectory") +} + +#' Get the absolute path of a file added through spark.addFile. +#' +#' @rdname spark.getSparkFiles +#' @param fileName The name of the file added through spark.addFile +#' @return the absolute path of a file added through spark.addFile. +#' @export +#' @examples +#'\dontrun{ +#' spark.getSparkFiles("myfile") +#'} +#' @note spark.getSparkFiles since 2.1.0 +spark.getSparkFiles <- function(fileName) { + callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName)) +} + #' Run a function over a list of elements, distributing the computations with Spark #' #' Run a function over a list of elements, distributing the computations with Spark. Applies a diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 1ab7f319df9fff27d2bc68edb4ba126f762cb195..0495418bb77798451f1fc4303e515710bdcb811a 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -166,3 +166,16 @@ test_that("spark.lapply should perform simple transforms", { expect_equal(doubled, as.list(2 * 1:10)) sparkR.session.stop() }) + +test_that("add and get file to be downloaded with Spark job on every node", { + sparkR.sparkContext() + path <- tempfile(pattern = "hello", fileext = ".txt") + filename <- basename(path) + words <- "Hello World!" + writeLines(words, path) + spark.addFile(path) + download_path <- spark.getSparkFiles(filename) + expect_equal(readLines(download_path), words) + unlink(path) + sparkR.session.stop() +}) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index db84172e1680ae4def64a76e1305e753dc781cbb..1981ad5671093c9f68ec1af94e9f0955182170a4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1427,7 +1427,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * supported for Hadoop-supported filesystems. */ def addFile(path: String, recursive: Boolean): Unit = { - val uri = new URI(path) + val uri = new Path(path).toUri val schemeCorrectedPath = uri.getScheme match { case null | "local" => new File(path).getCanonicalFile.toURI.toString case _ => path @@ -1458,8 +1458,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli logInfo(s"Added file $path at $key with timestamp $timestamp") // Fetch the file locally so that closures which are run on the driver can still use the // SparkFiles API to access files. - Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, - hadoopConfiguration, timestamp, useCache = false) + Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf, + env.securityManager, hadoopConfiguration, timestamp, useCache = false) postEnvironmentUpdate() } }