Skip to content
Snippets Groups Projects
Commit 77202a6c authored by Felix Cheung's avatar Felix Cheung Committed by Felix Cheung
Browse files

[SPARK-19231][SPARKR] add error handling for download and untar for Spark release


## What changes were proposed in this pull request?

When R is starting as a package and it needs to download the Spark release distribution we need to handle error for download and untar, and clean up, otherwise it will get stuck.

## How was this patch tested?

manually

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #16589 from felixcheung/rtarreturncode.

(cherry picked from commit 278fa1eb)
Signed-off-by: default avatarFelix Cheung <felixcheung@apache.org>
parent 29b954bb
No related branches found
No related tags found
No related merge requests found
...@@ -54,7 +54,7 @@ ...@@ -54,7 +54,7 @@
#' } #' }
#' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir #' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir
#' and force re-install Spark (in case the local directory or file is corrupted) #' and force re-install Spark (in case the local directory or file is corrupted)
#' @return \code{install.spark} returns the local directory where Spark is found or installed #' @return the (invisible) local directory where Spark is found or installed
#' @rdname install.spark #' @rdname install.spark
#' @name install.spark #' @name install.spark
#' @aliases install.spark #' @aliases install.spark
...@@ -115,17 +115,35 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, ...@@ -115,17 +115,35 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
} else { } else {
if (releaseUrl != "") { if (releaseUrl != "") {
message("Downloading from alternate URL:\n- ", releaseUrl) message("Downloading from alternate URL:\n- ", releaseUrl)
downloadUrl(releaseUrl, packageLocalPath, paste0("Fetch failed from ", releaseUrl)) success <- downloadUrl(releaseUrl, packageLocalPath)
if (!success) {
unlink(packageLocalPath)
stop(paste0("Fetch failed from ", releaseUrl))
}
} else { } else {
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
} }
} }
message(sprintf("Installing to %s", localDir)) message(sprintf("Installing to %s", localDir))
untar(tarfile = packageLocalPath, exdir = localDir) # There are two ways untar can fail - untar could stop() on errors like incomplete block on file
if (!tarExists || overwrite) { # or, tar command can return failure code
success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
error = function(e) {
message(e)
message()
FALSE
},
warning = function(w) {
# Treat warning as error, add an empty line with message()
message(w)
message()
FALSE
})
if (!tarExists || overwrite || !success) {
unlink(packageLocalPath) unlink(packageLocalPath)
} }
if (!success) stop("Extract archive failed.")
message("DONE.") message("DONE.")
Sys.setenv(SPARK_HOME = packageLocalDir) Sys.setenv(SPARK_HOME = packageLocalDir)
message(paste("SPARK_HOME set to", packageLocalDir)) message(paste("SPARK_HOME set to", packageLocalDir))
...@@ -135,8 +153,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, ...@@ -135,8 +153,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
# step 1: use user-provided url # step 1: use user-provided url
if (!is.null(mirrorUrl)) { if (!is.null(mirrorUrl)) {
msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl) message("Use user-provided mirror site: ", mirrorUrl)
message(msg)
success <- directDownloadTar(mirrorUrl, version, hadoopVersion, success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath) packageName, packageLocalPath)
if (success) { if (success) {
...@@ -156,7 +173,7 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa ...@@ -156,7 +173,7 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
packageName, packageLocalPath) packageName, packageLocalPath)
if (success) return() if (success) return()
} else { } else {
message("Unable to find preferred mirror site.") message("Unable to download from preferred mirror site: ", mirrorUrl)
} }
# step 3: use backup option # step 3: use backup option
...@@ -165,8 +182,11 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa ...@@ -165,8 +182,11 @@ robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
success <- directDownloadTar(mirrorUrl, version, hadoopVersion, success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath) packageName, packageLocalPath)
if (success) { if (success) {
return(packageLocalPath) return()
} else { } else {
# remove any partially downloaded file
unlink(packageLocalPath)
message("Unable to download from default mirror site: ", mirrorUrl)
msg <- sprintf(paste("Unable to download Spark %s for Hadoop %s.", msg <- sprintf(paste("Unable to download Spark %s for Hadoop %s.",
"Please check network connection, Hadoop version,", "Please check network connection, Hadoop version,",
"or provide other mirror sites."), "or provide other mirror sites."),
...@@ -201,14 +221,20 @@ directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa ...@@ -201,14 +221,20 @@ directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, pa
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
packageRemotePath) packageRemotePath)
message(msg) message(msg)
downloadUrl(packageRemotePath, packageLocalPath, paste0("Fetch failed from ", mirrorUrl)) downloadUrl(packageRemotePath, packageLocalPath)
} }
downloadUrl <- function(remotePath, localPath, errorMessage) { downloadUrl <- function(remotePath, localPath) {
isFail <- tryCatch(download.file(remotePath, localPath), isFail <- tryCatch(download.file(remotePath, localPath),
error = function(e) { error = function(e) {
message(errorMessage) message(e)
print(e) message()
TRUE
},
warning = function(w) {
# Treat warning as error, add an empty line with message()
message(w)
message()
TRUE TRUE
}) })
!isFail !isFail
...@@ -234,10 +260,9 @@ sparkCachePath <- function() { ...@@ -234,10 +260,9 @@ sparkCachePath <- function() {
if (.Platform$OS.type == "windows") { if (.Platform$OS.type == "windows") {
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA) winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
if (is.na(winAppPath)) { if (is.na(winAppPath)) {
msg <- paste("%LOCALAPPDATA% not found.", stop(paste("%LOCALAPPDATA% not found.",
"Please define the environment variable", "Please define the environment variable",
"or restart and enter an installation path in localDir.") "or restart and enter an installation path in localDir."))
stop(msg)
} else { } else {
path <- file.path(winAppPath, "Apache", "Spark", "Cache") path <- file.path(winAppPath, "Apache", "Spark", "Cache")
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment