diff --git a/R/pkg/R/install.R b/R/pkg/R/install.R index c6ed88e032a71809f01c9464113b4d3df2138abf..69b0a523b84e40c7a9678521df0e905374f0df3e 100644 --- a/R/pkg/R/install.R +++ b/R/pkg/R/install.R @@ -70,9 +70,9 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, localDir = NULL, overwrite = FALSE) { version <- paste0("spark-", packageVersion("SparkR")) hadoopVersion <- tolower(hadoopVersion) - hadoopVersionName <- hadoop_version_name(hadoopVersion) + hadoopVersionName <- hadoopVersionName(hadoopVersion) packageName <- paste(version, "bin", hadoopVersionName, sep = "-") - localDir <- ifelse(is.null(localDir), spark_cache_path(), + localDir <- ifelse(is.null(localDir), sparkCachePath(), normalizePath(localDir, mustWork = FALSE)) if (is.na(file.info(localDir)$isdir)) { @@ -88,12 +88,14 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, # can use dir.exists(packageLocalDir) under R 3.2.0 or later if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) { - fmt <- "Spark %s for Hadoop %s is found, and SPARK_HOME set to %s" + fmt <- "%s for Hadoop %s found, with SPARK_HOME set to %s" msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), packageLocalDir) message(msg) Sys.setenv(SPARK_HOME = packageLocalDir) return(invisible(packageLocalDir)) + } else { + message("Spark not found in the cache directory. Installation will start.") } packageLocalPath <- paste0(packageLocalDir, ".tgz") @@ -102,7 +104,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, if (tarExists && !overwrite) { message("tar file found.") } else { - robust_download_tar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) + robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) } message(sprintf("Installing to %s", localDir)) @@ -116,33 +118,37 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL, invisible(packageLocalDir) } -robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { +robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { # step 1: use user-provided url if (!is.null(mirrorUrl)) { msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl) message(msg) - success <- direct_download_tar(mirrorUrl, version, hadoopVersion, + success <- directDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) - if (success) return() + if (success) { + return() + } else { + message(paste0("Unable to download from mirrorUrl: ", mirrorUrl)) + } } else { - message("Mirror site not provided.") + message("MirrorUrl not provided.") } # step 2: use url suggested from apache website - message("Looking for site suggested from apache website...") - mirrorUrl <- get_preferred_mirror(version, packageName) + message("Looking for preferred site from apache website...") + mirrorUrl <- getPreferredMirror(version, packageName) if (!is.null(mirrorUrl)) { - success <- direct_download_tar(mirrorUrl, version, hadoopVersion, + success <- directDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) if (success) return() } else { - message("Unable to find suggested mirror site.") + message("Unable to find preferred mirror site.") } # step 3: use backup option message("To use backup site...") - mirrorUrl <- default_mirror_url() - success <- direct_download_tar(mirrorUrl, version, hadoopVersion, + mirrorUrl <- defaultMirrorUrl() + success <- directDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) if (success) { return(packageLocalPath) @@ -155,7 +161,7 @@ robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, } } -get_preferred_mirror <- function(version, packageName) { +getPreferredMirror <- function(version, packageName) { jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=", file.path("spark", version, packageName), ".tgz&as_json=1") @@ -175,10 +181,10 @@ get_preferred_mirror <- function(version, packageName) { mirrorPreferred } -direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { +directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) { packageRemotePath <- paste0( file.path(mirrorUrl, version, packageName), ".tgz") - fmt <- paste("Downloading Spark %s for Hadoop %s from:\n- %s") + fmt <- "Downloading %s for Hadoop %s from:\n- %s" msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion), packageRemotePath) message(msg) @@ -192,11 +198,11 @@ direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, !isFail } -default_mirror_url <- function() { +defaultMirrorUrl <- function() { "http://www-us.apache.org/dist/spark" } -hadoop_version_name <- function(hadoopVersion) { +hadoopVersionName <- function(hadoopVersion) { if (hadoopVersion == "without") { "without-hadoop" } else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) { @@ -208,7 +214,7 @@ hadoop_version_name <- function(hadoopVersion) { # The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and # adapt to Spark context -spark_cache_path <- function() { +sparkCachePath <- function() { if (.Platform$OS.type == "windows") { winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA) if (is.na(winAppPath)) { @@ -231,3 +237,21 @@ spark_cache_path <- function() { } normalizePath(path, mustWork = FALSE) } + + +installInstruction <- function(mode) { + if (mode == "remote") { + paste0("Connecting to a remote Spark master. ", + "Please make sure Spark package is also installed in this machine.\n", + "- If there is one, set the path in sparkHome parameter or ", + "environment variable SPARK_HOME.\n", + "- If not, you may run install.spark function to do the job. ", + "Please make sure the Spark and the Hadoop versions ", + "match the versions on the cluster. ", + "SparkR package is compatible with Spark ", packageVersion("SparkR"), ".", + "If you need further help, ", + "contact the administrators of the cluster.") + } else { + stop(paste0("No instruction found for ", mode, " mode.")) + } +} diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index 85815af1f3639662893e7f7fa201a4cde4baf92a..de53b0bf79b58a20081e55dd0f9a1083415e93e6 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -366,25 +366,10 @@ sparkR.session <- function( } overrideEnvs(sparkConfigMap, paramMap) } - # do not download if it is run in the sparkR shell - if (!nzchar(master) || is_master_local(master)) { - if (!is_sparkR_shell()) { - if (is.na(file.info(sparkHome)$isdir)) { - msg <- paste0("Spark not found in SPARK_HOME: ", - sparkHome, - " .\nTo search in the cache directory. ", - "Installation will start if not found.") - message(msg) - packageLocalDir <- install.spark() - sparkHome <- packageLocalDir - } else { - msg <- paste0("Spark package is found in SPARK_HOME: ", sparkHome) - message(msg) - } - } - } if (!exists(".sparkRjsc", envir = .sparkREnv)) { + retHome <- sparkCheckInstall(sparkHome, master) + if (!is.null(retHome)) sparkHome <- retHome sparkExecutorEnvMap <- new.env() sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, sparkExecutorEnvMap, sparkJars, sparkPackages) @@ -547,3 +532,35 @@ processSparkPackages <- function(packages) { } splittedPackages } + +# Utility function that checks and install Spark to local folder if not found +# +# Installation will not be triggered if it's called from sparkR shell +# or if the master url is not local +# +# @param sparkHome directory to find Spark package. +# @param master the Spark master URL, used to check local or remote mode. +# @return NULL if no need to update sparkHome, and new sparkHome otherwise. +sparkCheckInstall <- function(sparkHome, master) { + if (!isSparkRShell()) { + if (!is.na(file.info(sparkHome)$isdir)) { + msg <- paste0("Spark package found in SPARK_HOME: ", sparkHome) + message(msg) + NULL + } else { + if (!nzchar(master) || isMasterLocal(master)) { + msg <- paste0("Spark not found in SPARK_HOME: ", + sparkHome) + message(msg) + packageLocalDir <- install.spark() + packageLocalDir + } else { + msg <- paste0("Spark not found in SPARK_HOME: ", + sparkHome, "\n", installInstruction("remote")) + stop(msg) + } + } + } else { + NULL + } +} diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index d78c0a7a539a89b48335f80183d6c85d3298d8a9..2809ce5d376a5a8e657a5d6439ccec5a01ef7673 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -690,10 +690,10 @@ getSparkContext <- function() { sc } -is_master_local <- function(master) { +isMasterLocal <- function(master) { grepl("^local(\\[([0-9]+|\\*)\\])?$", master, perl = TRUE) } -is_sparkR_shell <- function() { +isSparkRShell <- function() { grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE) }