From 7557c4cfef2398d124b00472e2696f0559a36ef7 Mon Sep 17 00:00:00 2001 From: Andrew Or <andrewor14@gmail.com> Date: Tue, 26 Aug 2014 22:52:16 -0700 Subject: [PATCH] [SPARK-3167] Handle special driver configs in Windows This is an effort to bring the Windows scripts up to speed after recent splashing changes in #1845. Author: Andrew Or <andrewor14@gmail.com> Closes #2129 from andrewor14/windows-config and squashes the following commits: 881a8f0 [Andrew Or] Add reference to Windows taskkill 92e6047 [Andrew Or] Update a few comments (minor) 22b1acd [Andrew Or] Fix style again (minor) afcffea [Andrew Or] Fix style (minor) 72004c2 [Andrew Or] Actually respect --driver-java-options 803218b [Andrew Or] Actually respect SPARK_*_CLASSPATH eeb34a0 [Andrew Or] Update outdated comment (minor) 35caecc [Andrew Or] In Windows, actually kill Java processes on exit f97daa2 [Andrew Or] Fix Windows spark shell stdin issue 83ebe60 [Andrew Or] Parse special driver configs in Windows (broken) --- bin/compute-classpath.cmd | 3 +- bin/spark-class2.cmd | 46 ++++++++++++++++--- bin/spark-submit | 2 +- bin/spark-submit.cmd | 34 +++++++++----- .../SparkSubmitDriverBootstrapper.scala | 19 +++++--- python/pyspark/java_gateway.py | 17 +++++++ 6 files changed, 95 insertions(+), 26 deletions(-) mode change 100755 => 100644 bin/spark-class2.cmd diff --git a/bin/compute-classpath.cmd b/bin/compute-classpath.cmd index 58710cd1bd..5ad52452a5 100644 --- a/bin/compute-classpath.cmd +++ b/bin/compute-classpath.cmd @@ -36,7 +36,8 @@ rem Load environment variables from conf\spark-env.cmd, if it exists if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" rem Build up classpath -set CLASSPATH=%FWDIR%conf +set CLASSPATH=%SPARK_CLASSPATH%;%SPARK_SUBMIT_CLASSPATH%;%FWDIR%conf + if exist "%FWDIR%RELEASE" ( for %%d in ("%FWDIR%lib\spark-assembly*.jar") do ( set ASSEMBLY_JAR=%%d diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd old mode 100755 new mode 100644 index e2c5f9c385..6c56728191 --- a/bin/spark-class2.cmd +++ b/bin/spark-class2.cmd @@ -17,6 +17,8 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem +rem Any changes to this file must be reflected in SparkSubmitDriverBootstrapper.scala! + setlocal enabledelayedexpansion set SCALA_VERSION=2.10 @@ -38,7 +40,7 @@ if not "x%1"=="x" goto arg_given if not "x%SPARK_MEM%"=="x" ( echo Warning: SPARK_MEM is deprecated, please use a more specific config option - echo e.g., spark.executor.memory or SPARK_DRIVER_MEMORY. + echo e.g., spark.executor.memory or spark.driver.memory. ) rem Use SPARK_MEM or 512m as the default memory, to be overridden by specific options @@ -67,10 +69,18 @@ rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY. set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_EXECUTOR_OPTS% if not "x%SPARK_EXECUTOR_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_EXECUTOR_MEMORY% -rem All drivers use SPARK_JAVA_OPTS + SPARK_DRIVER_MEMORY. The repl also uses SPARK_REPL_OPTS. -) else if "%1"=="org.apache.spark.repl.Main" ( - set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_REPL_OPTS% +rem Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS + +rem SPARK_DRIVER_MEMORY + SPARK_SUBMIT_DRIVER_MEMORY. +rem The repl also uses SPARK_REPL_OPTS. +) else if "%1"=="org.apache.spark.deploy.SparkSubmit" ( + set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% %SPARK_SUBMIT_OPTS% %SPARK_REPL_OPTS% + if not "x%SPARK_SUBMIT_LIBRARY_PATH%"=="x" ( + set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_SUBMIT_LIBRARY_PATH% + ) else if not "x%SPARK_LIBRARY_PATH%"=="x" ( + set OUR_JAVA_OPTS=!OUR_JAVA_OPTS! -Djava.library.path=%SPARK_LIBRARY_PATH% + ) if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY% + if not "x%SPARK_SUBMIT_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_SUBMIT_DRIVER_MEMORY% ) else ( set OUR_JAVA_OPTS=%SPARK_JAVA_OPTS% if not "x%SPARK_DRIVER_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DRIVER_MEMORY% @@ -80,9 +90,9 @@ rem Set JAVA_OPTS to be able to load native libraries and to set heap size for /f "tokens=3" %%i in ('java -version 2^>^&1 ^| find "version"') do set jversion=%%i for /f "tokens=1 delims=_" %%i in ("%jversion:~1,-1%") do set jversion=%%i if "%jversion%" geq "1.8.0" ( - set JAVA_OPTS=%OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM% + set JAVA_OPTS=%OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM% ) else ( - set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Djava.library.path=%SPARK_LIBRARY_PATH% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM% + set JAVA_OPTS=-XX:MaxPermSize=128m %OUR_JAVA_OPTS% -Xms%OUR_JAVA_MEM% -Xmx%OUR_JAVA_MEM% ) rem Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala! @@ -115,5 +125,27 @@ rem Figure out where java is. set RUNNER=java if not "x%JAVA_HOME%"=="x" set RUNNER=%JAVA_HOME%\bin\java -"%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %* +rem In Spark submit client mode, the driver is launched in the same JVM as Spark submit itself. +rem Here we must parse the properties file for relevant "spark.driver.*" configs before launching +rem the driver JVM itself. Instead of handling this complexity here, we launch a separate JVM +rem to prepare the launch environment of this driver JVM. + +rem In this case, leave out the main class (org.apache.spark.deploy.SparkSubmit) and use our own. +rem Leaving out the first argument is surprisingly difficult to do in Windows. Note that this must +rem be done here because the Windows "shift" command does not work in a conditional block. +set BOOTSTRAP_ARGS= +shift +:start_parse +if "%~1" == "" goto end_parse +set BOOTSTRAP_ARGS=%BOOTSTRAP_ARGS% %~1 +shift +goto start_parse +:end_parse + +if not [%SPARK_SUBMIT_BOOTSTRAP_DRIVER%] == [] ( + set SPARK_CLASS=1 + "%RUNNER%" org.apache.spark.deploy.SparkSubmitDriverBootstrapper %BOOTSTRAP_ARGS% +) else ( + "%RUNNER%" -cp "%CLASSPATH%" %JAVA_OPTS% %* +) :exit diff --git a/bin/spark-submit b/bin/spark-submit index 32c911cd04..277c4ce571 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -17,7 +17,7 @@ # limitations under the License. # -# NOTE: Any changes in this file must be reflected in SparkClassLauncher.scala! +# NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala! export SPARK_HOME="$(cd `dirname $0`/..; pwd)" ORIG_ARGS=("$@") diff --git a/bin/spark-submit.cmd b/bin/spark-submit.cmd index 6eb702ed8c..cf6046d154 100644 --- a/bin/spark-submit.cmd +++ b/bin/spark-submit.cmd @@ -17,23 +17,28 @@ rem See the License for the specific language governing permissions and rem limitations under the License. rem +rem NOTE: Any changes in this file must be reflected in SparkSubmitDriverBootstrapper.scala! + set SPARK_HOME=%~dp0.. set ORIG_ARGS=%* -rem Clear the values of all variables used -set DEPLOY_MODE= -set DRIVER_MEMORY= +rem Reset the values of all variables used +set SPARK_SUBMIT_DEPLOY_MODE=client +set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf +set SPARK_SUBMIT_DRIVER_MEMORY= set SPARK_SUBMIT_LIBRARY_PATH= set SPARK_SUBMIT_CLASSPATH= set SPARK_SUBMIT_OPTS= -set SPARK_DRIVER_MEMORY= +set SPARK_SUBMIT_BOOTSTRAP_DRIVER= :loop if [%1] == [] goto continue if [%1] == [--deploy-mode] ( - set DEPLOY_MODE=%2 + set SPARK_SUBMIT_DEPLOY_MODE=%2 + ) else if [%1] == [--properties-file] ( + set SPARK_SUBMIT_PROPERTIES_FILE=%2 ) else if [%1] == [--driver-memory] ( - set DRIVER_MEMORY=%2 + set SPARK_SUBMIT_DRIVER_MEMORY=%2 ) else if [%1] == [--driver-library-path] ( set SPARK_SUBMIT_LIBRARY_PATH=%2 ) else if [%1] == [--driver-class-path] ( @@ -45,12 +50,19 @@ if [%1] == [] goto continue goto loop :continue -if [%DEPLOY_MODE%] == [] ( - set DEPLOY_MODE=client -) +rem For client mode, the driver will be launched in the same JVM that launches +rem SparkSubmit, so we may need to read the properties file for any extra class +rem paths, library paths, java options and memory early on. Otherwise, it will +rem be too late by the time the driver JVM has started. -if not [%DRIVER_MEMORY%] == [] if [%DEPLOY_MODE%] == [client] ( - set SPARK_DRIVER_MEMORY=%DRIVER_MEMORY% +if [%SPARK_SUBMIT_DEPLOY_MODE%] == [client] ( + if exist %SPARK_SUBMIT_PROPERTIES_FILE% ( + rem Parse the properties file only if the special configs exist + for /f %%i in ('findstr /r /c:"^[\t ]*spark.driver.memory" /c:"^[\t ]*spark.driver.extra" ^ + %SPARK_SUBMIT_PROPERTIES_FILE%') do ( + set SPARK_SUBMIT_BOOTSTRAP_DRIVER=1 + ) + ) ) cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.spark.deploy.SparkSubmit %ORIG_ARGS% diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index af607e6a4a..7ca96ed57c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -133,17 +133,24 @@ private[spark] object SparkSubmitDriverBootstrapper { val process = builder.start() // Redirect stdin, stdout, and stderr to/from the child JVM - val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") val stdoutThread = new RedirectThread(process.getInputStream, System.out, "redirect stdout") val stderrThread = new RedirectThread(process.getErrorStream, System.err, "redirect stderr") - stdinThread.start() stdoutThread.start() stderrThread.start() - // Terminate on broken pipe, which signals that the parent process has exited. This is - // important for the PySpark shell, where Spark submit itself is a python subprocess. - stdinThread.join() - process.destroy() + // In Windows, the subprocess reads directly from our stdin, so we should avoid spawning + // a thread that contends with the subprocess in reading from System.in. + if (Utils.isWindows) { + // For the PySpark shell, the termination of this process is handled in java_gateway.py + process.waitFor() + } else { + // Terminate on broken pipe, which signals that the parent process has exited. This is + // important for the PySpark shell, where Spark submit itself is a python subprocess. + val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") + stdinThread.start() + stdinThread.join() + process.destroy() + } } } diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 6f4f62f23b..9c70fa5c16 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -15,6 +15,7 @@ # limitations under the License. # +import atexit import os import sys import signal @@ -69,6 +70,22 @@ def launch_gateway(): error_msg += "--------------------------------------------------------------\n" raise Exception(error_msg) + # In Windows, ensure the Java child processes do not linger after Python has exited. + # In UNIX-based systems, the child process can kill itself on broken pipe (i.e. when + # the parent process' stdin sends an EOF). In Windows, however, this is not possible + # because java.lang.Process reads directly from the parent process' stdin, contending + # with any opportunity to read an EOF from the parent. Note that this is only best + # effort and will not take effect if the python process is violently terminated. + if on_windows: + # In Windows, the child process here is "spark-submit.cmd", not the JVM itself + # (because the UNIX "exec" command is not available). This means we cannot simply + # call proc.kill(), which kills only the "spark-submit.cmd" process but not the + # JVMs. Instead, we use "taskkill" with the tree-kill option "/t" to terminate all + # child processes in the tree (http://technet.microsoft.com/en-us/library/bb491009.aspx) + def killChild(): + Popen(["cmd", "/c", "taskkill", "/f", "/t", "/pid", str(proc.pid)]) + atexit.register(killChild) + # Create a thread to echo output from the GatewayServer, which is required # for Java log output to show up: class EchoOutputThread(Thread): -- GitLab