diff --git a/bin/pyspark b/bin/pyspark index 1d8c94d43d2852556f4ecf36bc85c7f7b623cd14..0b4f695dd06dddf12f5a8e6d7a0f9c2e31d3e573 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -132,7 +132,5 @@ if [[ "$1" =~ \.py$ ]]; then gatherSparkSubmitOpts "$@" exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}" else - # PySpark shell requires special handling downstream - export PYSPARK_SHELL=1 exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS fi diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index 59415e9bdec2cf31f8ce63a371f273e45a7a8195..a542ec80b49d630d2d66bc8b2480310f53899fc2 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R "\.py"') do ( ) if [%PYTHON_FILE%] == [] ( - set PYSPARK_SHELL=1 if [%IPYTHON%] == [1] ( ipython %IPYTHON_OPTS% ) else ( diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala index 7ffff29122d4b2d3e69f5cef438a31098b12e9ea..aa3743ca7df632d82b7b171962570d9f4e568d2e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala @@ -149,14 +149,15 @@ private[spark] object SparkSubmitDriverBootstrapper { // subprocess there already reads directly from our stdin, so we should avoid spawning a // thread that contends with the subprocess in reading from System.in. val isWindows = Utils.isWindows - val isPySparkShell = sys.env.contains("PYSPARK_SHELL") + val isSubprocess = sys.env.contains("IS_SUBPROCESS") if (!isWindows) { val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin") stdinThread.start() - // For the PySpark shell, Spark submit itself runs as a python subprocess, and so this JVM - // should terminate on broken pipe, which signals that the parent process has exited. In - // Windows, the termination logic for the PySpark shell is handled in java_gateway.py - if (isPySparkShell) { + // Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on + // broken pipe, signaling that the parent process has exited. This is the case if the + // application is launched directly from python, as in the PySpark shell. In Windows, + // the termination logic is handled in java_gateway.py + if (isSubprocess) { stdinThread.join() process.destroy() } diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 9c70fa5c16d0ce70d1d817a2cd94063eb9c646c6..a975dc19cb78edfc1295c3f7e2e1ac2560bda5f2 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -45,7 +45,9 @@ def launch_gateway(): # Don't send ctrl-c / SIGINT to the Java gateway: def preexec_func(): signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + env = dict(os.environ) + env["IS_SUBPROCESS"] = "1" # tell JVM to exit after python exits + proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func, env=env) else: # preexec_fn not supported on Windows proc = Popen(command, stdout=PIPE, stdin=PIPE)