diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 4793578ad684eb7a526f0936b282552cfdf2e7f1..fe2f3e3d10a9b311d13e479f45ec8c18a99a7c82 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -231,17 +231,22 @@ setCheckpointDir <- function(sc, dirName) { #' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, #' use spark.getSparkFiles(fileName) to find its download location. #' +#' A directory can be given if the recursive option is set to true. +#' Currently directories are only supported for Hadoop-supported filesystems. +#' Refer Hadoop-supported filesystems at \url{https://wiki.apache.org/hadoop/HCFS}. +#' #' @rdname spark.addFile #' @param path The path of the file to be added +#' @param recursive Whether to add files recursively from the path. Default is FALSE. #' @export #' @examples #'\dontrun{ #' spark.addFile("~/myfile") #'} #' @note spark.addFile since 2.1.0 -spark.addFile <- function(path) { +spark.addFile <- function(path, recursive = FALSE) { sc <- getSparkContext() - invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)))) + invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)), recursive)) } #' Get the root directory that contains files added through spark.addFile. diff --git a/R/pkg/inst/tests/testthat/test_context.R b/R/pkg/inst/tests/testthat/test_context.R index 0495418bb77798451f1fc4303e515710bdcb811a..caca06933952bce4cc7df96edfc01e126c1d9129 100644 --- a/R/pkg/inst/tests/testthat/test_context.R +++ b/R/pkg/inst/tests/testthat/test_context.R @@ -169,6 +169,7 @@ test_that("spark.lapply should perform simple transforms", { test_that("add and get file to be downloaded with Spark job on every node", { sparkR.sparkContext() + # Test add file. path <- tempfile(pattern = "hello", fileext = ".txt") filename <- basename(path) words <- "Hello World!" @@ -177,5 +178,26 @@ test_that("add and get file to be downloaded with Spark job on every node", { download_path <- spark.getSparkFiles(filename) expect_equal(readLines(download_path), words) unlink(path) + + # Test add directory recursively. + path <- paste0(tempdir(), "/", "recursive_dir") + dir.create(path) + dir_name <- basename(path) + path1 <- paste0(path, "/", "hello.txt") + file.create(path1) + sub_path <- paste0(path, "/", "sub_hello") + dir.create(sub_path) + path2 <- paste0(sub_path, "/", "sub_hello.txt") + file.create(path2) + words <- "Hello World!" + sub_words <- "Sub Hello World!" + writeLines(words, path1) + writeLines(sub_words, path2) + spark.addFile(path, recursive = TRUE) + download_path1 <- spark.getSparkFiles(paste0(dir_name, "/", "hello.txt")) + expect_equal(readLines(download_path1), words) + download_path2 <- spark.getSparkFiles(paste0(dir_name, "/", "sub_hello/sub_hello.txt")) + expect_equal(readLines(download_path2), sub_words) + unlink(path, recursive = TRUE) sparkR.session.stop() })