Skip to content
Snippets Groups Projects
Commit 8a51cfdc authored by Felix Cheung's avatar Felix Cheung Committed by Shivaram Venkataraman
Browse files

[SPARK-18810][SPARKR] SparkR install.spark does not work for RCs, snapshots

## What changes were proposed in this pull request?

Support overriding the download url (include version directory) in an environment variable, `SPARKR_RELEASE_DOWNLOAD_URL`

## How was this patch tested?

unit test, manually testing
- snapshot build url
  - download when spark jar not cached
  - when spark jar is cached
- RC build url
  - download when spark jar not cached
  - when spark jar is cached
- multiple cached spark versions
- starting with sparkR shell

To use this,
```
SPARKR_RELEASE_DOWNLOAD_URL=http://this_is_the_url_to_spark_release_tgz R
```
then in R,
```
library(SparkR) # or specify lib.loc
sparkR.session()
```

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #16248 from felixcheung/rinstallurl.
parent 90abfd15
No related branches found
No related tags found
No related merge requests found
......@@ -79,19 +79,28 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
dir.create(localDir, recursive = TRUE)
}
packageLocalDir <- file.path(localDir, packageName)
if (overwrite) {
message(paste0("Overwrite = TRUE: download and overwrite the tar file",
"and Spark package directory if they exist."))
}
releaseUrl <- Sys.getenv("SPARKR_RELEASE_DOWNLOAD_URL")
if (releaseUrl != "") {
packageName <- basenameSansExtFromUrl(releaseUrl)
}
packageLocalDir <- file.path(localDir, packageName)
# can use dir.exists(packageLocalDir) under R 3.2.0 or later
if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) {
fmt <- "%s for Hadoop %s found, with SPARK_HOME set to %s"
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
packageLocalDir)
message(msg)
if (releaseUrl != "") {
message(paste(packageName, "found, setting SPARK_HOME to", packageLocalDir))
} else {
fmt <- "%s for Hadoop %s found, setting SPARK_HOME to %s"
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
packageLocalDir)
message(msg)
}
Sys.setenv(SPARK_HOME = packageLocalDir)
return(invisible(packageLocalDir))
} else {
......@@ -104,7 +113,12 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
if (tarExists && !overwrite) {
message("tar file found.")
} else {
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
if (releaseUrl != "") {
message("Downloading from alternate URL:\n- ", releaseUrl)
downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", releaseUrl))
} else {
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
}
}
message(sprintf("Installing to %s", localDir))
......@@ -182,16 +196,18 @@ getPreferredMirror <- function(version, packageName) {
}
directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
packageRemotePath <- paste0(
file.path(mirrorUrl, version, packageName), ".tgz")
packageRemotePath <- paste0(file.path(mirrorUrl, version, packageName), ".tgz")
fmt <- "Downloading %s for Hadoop %s from:\n- %s"
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
packageRemotePath)
message(msg)
downloadUrl(packageRemotePath, packageLocalPath, paste0("Fetch failed from ", mirrorUrl))
}
isFail <- tryCatch(download.file(packageRemotePath, packageLocalPath),
downloadUrl <- function(remotePath, localPath, errorMessage) {
isFail <- tryCatch(download.file(remotePath, localPath),
error = function(e) {
message(sprintf("Fetch failed from %s", mirrorUrl))
message(errorMessage)
print(e)
TRUE
})
......
......@@ -841,7 +841,7 @@ captureJVMException <- function(e, method) {
#
# @param inputData a list of rows, with each row a list
# @return data.frame with raw columns as lists
rbindRaws <- function(inputData){
rbindRaws <- function(inputData) {
row1 <- inputData[[1]]
rawcolumns <- ("raw" == sapply(row1, class))
......@@ -851,3 +851,15 @@ rbindRaws <- function(inputData){
out[!rawcolumns] <- lapply(out[!rawcolumns], unlist)
out
}
# Get basename without extension from URL
basenameSansExtFromUrl <- function(url) {
# split by '/'
splits <- unlist(strsplit(url, "^.+/"))
last <- tail(splits, 1)
# this is from file_path_sans_ext
# first, remove any compression extension
filename <- sub("[.](gz|bz2|xz)$", "", last)
# then, strip extension by the last '.'
sub("([^.]+)\\.[[:alnum:]]+$", "\\1", filename)
}
......@@ -228,4 +228,15 @@ test_that("varargsToStrEnv", {
expect_warning(varargsToStrEnv(1, 2, 3, 4), "Unnamed arguments ignored: 1, 2, 3, 4.")
})
test_that("basenameSansExtFromUrl", {
x <- paste0("http://people.apache.org/~pwendell/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-",
"SNAPSHOT-2016_12_09_11_08-eb2d9bf-bin/spark-2.1.1-SNAPSHOT-bin-hadoop2.7.tgz")
y <- paste0("http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc2-bin/spark-2.1.0-",
"bin-hadoop2.4-without-hive.tgz")
expect_equal(basenameSansExtFromUrl(x), "spark-2.1.1-SNAPSHOT-bin-hadoop2.7")
expect_equal(basenameSansExtFromUrl(y), "spark-2.1.0-bin-hadoop2.4-without-hive")
z <- "http://people.apache.org/~pwendell/spark-releases/spark-2.1.0--hive.tar.gz"
expect_equal(basenameSansExtFromUrl(z), "spark-2.1.0--hive")
})
sparkR.session.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment