diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R index 69b0a523b84e40c7a9678521df0e905374f0df3e..097b7ad4bea086454516c80180e897970034f631 100644 --- a/R/pkg/R/install.R +++ b/R/pkg/R/install.R @@ -79,19 +79,28 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, dir.create(localDir, recursive = TRUE) } - packageLocalDir <- file.path(localDir, packageName) - if (overwrite) { message(paste0("Overwrite = TRUE: download and overwrite the tar file", "and Spark package directory if they exist.")) } + releaseUrl <- Sys.getenv("SPARKR_RELEASE_DOWNLOAD_URL") + if (releaseUrl != "") { + packageName <- basenameSansExtFromUrl(releaseUrl) + } + + packageLocalDir <- file.path(localDir, packageName) + # can use dir.exists(packageLocalDir) under R 3.2.0 or later if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) { - fmt <- "%s for Hadoop %s found, with SPARK_HOME set to %s" - msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), - packageLocalDir) - message(msg) + if (releaseUrl != "") { + message(paste(packageName, "found, setting SPARK_HOME to", packageLocalDir)) + } else { + fmt <- "%s for Hadoop %s found, setting SPARK_HOME to %s" + msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), + packageLocalDir) + message(msg) + } Sys.setenv(SPARK_HOME = packageLocalDir) return(invisible(packageLocalDir)) } else { @@ -104,7 +113,12 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, if (tarExists && !overwrite) { message("tar file found.") } else { - robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) + if (releaseUrl != "") { + message("Downloading from alternate URL:\n- ", releaseUrl) + downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", releaseUrl)) + } else { + robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) + } } message(sprintf("Installing to %s", localDir)) @@ -182,16 +196,18 @@ getPreferredMirror <- function(version, packageName) { } directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { - packageRemotePath <- paste0( - file.path(mirrorUrl, version, packageName), ".tgz") + packageRemotePath <- paste0(file.path(mirrorUrl, version, packageName), ".tgz") fmt <- "Downloading %s for Hadoop %s from:\n- %s" msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), packageRemotePath) message(msg) + downloadUrl(packageRemotePath, packageLocalPath, paste0("Fetch failed from ", mirrorUrl)) +} - isFail <- tryCatch(download.file(packageRemotePath, packageLocalPath), +downloadUrl <- function(remotePath, localPath, errorMessage) { + isFail <- tryCatch(download.file(remotePath, localPath), error = function(e) { - message(sprintf("Fetch failed from %s", mirrorUrl)) + message(errorMessage) print(e) TRUE }) diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 098c0e3e31e95ba1e468363f2ecaee435a2a9639..1283449f3592a3d8813d93788f68584faa497485 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -841,7 +841,7 @@ captureJVMException <- function(e, method) { # # @param inputData a list of rows, with each row a list # @return data.frame with raw columns as lists -rbindRaws <- function(inputData){ +rbindRaws <- function(inputData) { row1 <- inputData[[1]] rawcolumns <- ("raw" == sapply(row1, class)) @@ -851,3 +851,15 @@ rbindRaws <- function(inputData){ out[!rawcolumns] <- lapply(out[!rawcolumns], unlist) out } + +# Get basename without extension from URL +basenameSansExtFromUrl <- function(url) { + # split by '/' + splits <- unlist(strsplit(url, "^.+/")) + last <- tail(splits, 1) + # this is from file_path_sans_ext + # first, remove any compression extension + filename <- sub("[.](gz|bz2|xz)$", "", last) + # then, strip extension by the last '.' + sub("([^.]+)\\.[[:alnum:]]+$", "\\1", filename) +} diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index 607c407f04f9718c429f55ff68bc3854ec71196a..c87524842876ea912c5026411c5317a6a930b619 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -228,4 +228,15 @@ test_that("varargsToStrEnv", { expect_warning(varargsToStrEnv(1, 2, 3, 4), "Unnamed arguments ignored: 1, 2, 3, 4.") }) +test_that("basenameSansExtFromUrl", { + x <- paste0("http://people.apache.org/~pwendell/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-", + "SNAPSHOT-2016_12_09_11_08-eb2d9bf-bin/spark-2.1.1-SNAPSHOT-bin-hadoop2.7.tgz") + y <- paste0("http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc2-bin/spark-2.1.0-", + "bin-hadoop2.4-without-hive.tgz") + expect_equal(basenameSansExtFromUrl(x), "spark-2.1.1-SNAPSHOT-bin-hadoop2.7") + expect_equal(basenameSansExtFromUrl(y), "spark-2.1.0-bin-hadoop2.4-without-hive") + z <- "http://people.apache.org/~pwendell/spark-releases/spark-2.1.0--hive.tar.gz" + expect_equal(basenameSansExtFromUrl(z), "spark-2.1.0--hive") +}) + sparkR.session.stop()