Skip to content
Snippets Groups Projects
Commit 3a60be4b authored by Junyang Qian's avatar Junyang Qian Committed by Felix Cheung
Browse files

[SPARKR][MINOR] Add installation message for remote master mode and improve other messages

## What changes were proposed in this pull request?

This PR gives informative message to users when they try to connect to a remote master but don't have Spark package in their local machine.

As a clarification, for now, automatic installation will only happen if they start SparkR in R console (rather than from sparkr-shell) and connect to local master. In the remote master mode, local Spark package is still needed, but we will not trigger the install.spark function because the versions have to match those on the cluster, which involves more user input. Instead, we here try to provide detailed message that may help the users.

Some of the other messages have also been slightly changed.

## How was this patch tested?

Manual test.

Author: Junyang Qian <junyangq@databricks.com>

Closes #14761 from junyangq/SPARK-16579-V1.
parent 18708f76
No related branches found
No related tags found
No related merge requests found
......@@ -70,9 +70,9 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
localDir = NULL, overwrite = FALSE) {
version <- paste0("spark-", packageVersion("SparkR"))
hadoopVersion <- tolower(hadoopVersion)
hadoopVersionName <- hadoop_version_name(hadoopVersion)
hadoopVersionName <- hadoopVersionName(hadoopVersion)
packageName <- paste(version, "bin", hadoopVersionName, sep = "-")
localDir <- ifelse(is.null(localDir), spark_cache_path(),
localDir <- ifelse(is.null(localDir), sparkCachePath(),
normalizePath(localDir, mustWork = FALSE))
if (is.na(file.info(localDir)$isdir)) {
......@@ -88,12 +88,14 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
# can use dir.exists(packageLocalDir) under R 3.2.0 or later
if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) {
fmt <- "Spark %s for Hadoop %s is found, and SPARK_HOME set to %s"
fmt <- "%s for Hadoop %s found, with SPARK_HOME set to %s"
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
packageLocalDir)
message(msg)
Sys.setenv(SPARK_HOME = packageLocalDir)
return(invisible(packageLocalDir))
} else {
message("Spark not found in the cache directory. Installation will start.")
}
packageLocalPath <- paste0(packageLocalDir, ".tgz")
......@@ -102,7 +104,7 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
if (tarExists && !overwrite) {
message("tar file found.")
} else {
robust_download_tar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
}
message(sprintf("Installing to %s", localDir))
......@@ -116,33 +118,37 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
invisible(packageLocalDir)
}
robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
# step 1: use user-provided url
if (!is.null(mirrorUrl)) {
msg <- sprintf("Use user-provided mirror site: %s.", mirrorUrl)
message(msg)
success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
if (success) return()
if (success) {
return()
} else {
message(paste0("Unable to download from mirrorUrl: ", mirrorUrl))
}
} else {
message("Mirror site not provided.")
message("MirrorUrl not provided.")
}
# step 2: use url suggested from apache website
message("Looking for site suggested from apache website...")
mirrorUrl <- get_preferred_mirror(version, packageName)
message("Looking for preferred site from apache website...")
mirrorUrl <- getPreferredMirror(version, packageName)
if (!is.null(mirrorUrl)) {
success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
if (success) return()
} else {
message("Unable to find suggested mirror site.")
message("Unable to find preferred mirror site.")
}
# step 3: use backup option
message("To use backup site...")
mirrorUrl <- default_mirror_url()
success <- direct_download_tar(mirrorUrl, version, hadoopVersion,
mirrorUrl <- defaultMirrorUrl()
success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
packageName, packageLocalPath)
if (success) {
return(packageLocalPath)
......@@ -155,7 +161,7 @@ robust_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName,
}
}
get_preferred_mirror <- function(version, packageName) {
getPreferredMirror <- function(version, packageName) {
jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=",
file.path("spark", version, packageName),
".tgz&as_json=1")
......@@ -175,10 +181,10 @@ get_preferred_mirror <- function(version, packageName) {
mirrorPreferred
}
direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
packageRemotePath <- paste0(
file.path(mirrorUrl, version, packageName), ".tgz")
fmt <- paste("Downloading Spark %s for Hadoop %s from:\n- %s")
fmt <- "Downloading %s for Hadoop %s from:\n- %s"
msg <- sprintf(fmt, version, ifelse(hadoopVersion == "without", "Free build", hadoopVersion),
packageRemotePath)
message(msg)
......@@ -192,11 +198,11 @@ direct_download_tar <- function(mirrorUrl, version, hadoopVersion, packageName,
!isFail
}
default_mirror_url <- function() {
defaultMirrorUrl <- function() {
"http://www-us.apache.org/dist/spark"
}
hadoop_version_name <- function(hadoopVersion) {
hadoopVersionName <- function(hadoopVersion) {
if (hadoopVersion == "without") {
"without-hadoop"
} else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) {
......@@ -208,7 +214,7 @@ hadoop_version_name <- function(hadoopVersion) {
# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
# adapt to Spark context
spark_cache_path <- function() {
sparkCachePath <- function() {
if (.Platform$OS.type == "windows") {
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
if (is.na(winAppPath)) {
......@@ -231,3 +237,21 @@ spark_cache_path <- function() {
}
normalizePath(path, mustWork = FALSE)
}
installInstruction <- function(mode) {
if (mode == "remote") {
paste0("Connecting to a remote Spark master. ",
"Please make sure Spark package is also installed in this machine.\n",
"- If there is one, set the path in sparkHome parameter or ",
"environment variable SPARK_HOME.\n",
"- If not, you may run install.spark function to do the job. ",
"Please make sure the Spark and the Hadoop versions ",
"match the versions on the cluster. ",
"SparkR package is compatible with Spark ", packageVersion("SparkR"), ".",
"If you need further help, ",
"contact the administrators of the cluster.")
} else {
stop(paste0("No instruction found for ", mode, " mode."))
}
}
......@@ -366,25 +366,10 @@ sparkR.session <- function(
}
overrideEnvs(sparkConfigMap, paramMap)
}
# do not download if it is run in the sparkR shell
if (!nzchar(master) || is_master_local(master)) {
if (!is_sparkR_shell()) {
if (is.na(file.info(sparkHome)$isdir)) {
msg <- paste0("Spark not found in SPARK_HOME: ",
sparkHome,
" .\nTo search in the cache directory. ",
"Installation will start if not found.")
message(msg)
packageLocalDir <- install.spark()
sparkHome <- packageLocalDir
} else {
msg <- paste0("Spark package is found in SPARK_HOME: ", sparkHome)
message(msg)
}
}
}
if (!exists(".sparkRjsc", envir = .sparkREnv)) {
retHome <- sparkCheckInstall(sparkHome, master)
if (!is.null(retHome)) sparkHome <- retHome
sparkExecutorEnvMap <- new.env()
sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, sparkExecutorEnvMap,
sparkJars, sparkPackages)
......@@ -547,3 +532,35 @@ processSparkPackages <- function(packages) {
}
splittedPackages
}
# Utility function that checks and install Spark to local folder if not found
#
# Installation will not be triggered if it's called from sparkR shell
# or if the master url is not local
#
# @param sparkHome directory to find Spark package.
# @param master the Spark master URL, used to check local or remote mode.
# @return NULL if no need to update sparkHome, and new sparkHome otherwise.
sparkCheckInstall <- function(sparkHome, master) {
if (!isSparkRShell()) {
if (!is.na(file.info(sparkHome)$isdir)) {
msg <- paste0("Spark package found in SPARK_HOME: ", sparkHome)
message(msg)
NULL
} else {
if (!nzchar(master) || isMasterLocal(master)) {
msg <- paste0("Spark not found in SPARK_HOME: ",
sparkHome)
message(msg)
packageLocalDir <- install.spark()
packageLocalDir
} else {
msg <- paste0("Spark not found in SPARK_HOME: ",
sparkHome, "\n", installInstruction("remote"))
stop(msg)
}
}
} else {
NULL
}
}
......@@ -690,10 +690,10 @@ getSparkContext <- function() {
sc
}
is_master_local <- function(master) {
isMasterLocal <- function(master) {
grepl("^local(\\[([0-9]+|\\*)\\])?$", master, perl = TRUE)
}
is_sparkR_shell <- function() {
isSparkRShell <- function() {
grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment