From 7a9e25c38380e6c62080d62ad38a4830e44fe753 Mon Sep 17 00:00:00 2001 From: Jeff Zhang <zjffdu@apache.org> Date: Thu, 11 Aug 2016 20:08:25 -0700 Subject: [PATCH] =?UTF-8?q?[SPARK-13081][PYSPARK][SPARK=5FSUBMIT]=20Allow?= =?UTF-8?q?=20set=20pythonExec=20of=20driver=20and=20executor=20through=20?= =?UTF-8?q?conf=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Before this PR, user have to export environment variable to specify the python of driver & executor which is not so convenient for users. This PR is trying to allow user to specify python through configuration "--pyspark-driver-python" & "--pyspark-executor-python" Manually test in local & yarn mode for pyspark-shell and pyspark batch mode. Author: Jeff Zhang <zjffdu@apache.org> Closes #13146 from zjffdu/SPARK-13081. --- .../apache/spark/deploy/PythonRunner.scala | 14 ++++++++++--- .../spark/internal/config/package.scala | 8 +++++++ .../spark/launcher/SparkLauncherSuite.java | 8 +++++++ .../org/apache/spark/SparkConfSuite.scala | 2 ++ .../spark/deploy/SparkSubmitSuite.scala | 5 +++++ docs/configuration.md | 21 +++++++++++++++++-- .../apache/spark/launcher/SparkLauncher.java | 4 ++++ .../launcher/SparkSubmitCommandBuilder.java | 18 +++++++++++++--- 8 files changed, 72 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 6227a30dc9..0b1cec2df8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -24,8 +24,9 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._ import scala.util.Try -import org.apache.spark.SparkUserAppException +import org.apache.spark.{SparkConf, SparkUserAppException} import org.apache.spark.api.python.PythonUtils +import org.apache.spark.internal.config._ import org.apache.spark.util.{RedirectThread, Utils} /** @@ -37,8 +38,12 @@ object PythonRunner { val pythonFile = args(0) val pyFiles = args(1) val otherArgs = args.slice(2, args.length) - val pythonExec = - sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python")) + val sparkConf = new SparkConf() + val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) + .orElse(sparkConf.get(PYSPARK_PYTHON)) + .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")) + .orElse(sys.env.get("PYSPARK_PYTHON")) + .getOrElse("python") // Format python file paths before adding them to the PYTHONPATH val formattedPythonFile = formatPath(pythonFile) @@ -77,6 +82,9 @@ object PythonRunner { // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort) + // pass conf spark.pyspark.python to python process, the only way to pass info to + // python process is through environment variable. + sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _)) builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize try { val process = builder.start() diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index e646d9964a..be3dac4d24 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -106,4 +106,12 @@ package object config { private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace") .stringConf .createOptional + + private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python") + .stringConf + .createOptional + + private[spark] val PYSPARK_PYTHON = ConfigBuilder("spark.pyspark.python") + .stringConf + .createOptional } diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index e393db06a0..682d98867b 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; import static org.junit.Assert.*; +import org.apache.spark.internal.config.package$; + /** * These tests require the Spark assembly to be built before they can be run. */ @@ -89,6 +91,12 @@ public class SparkLauncherSuite { launcher.setConf("spark.foo", "foo"); launcher.addSparkArg(opts.CONF, "spark.foo=bar"); assertEquals("bar", launcher.builder.conf.get("spark.foo")); + + launcher.setConf(SparkLauncher.PYSPARK_DRIVER_PYTHON, "python3.4"); + launcher.setConf(SparkLauncher.PYSPARK_PYTHON, "python3.5"); + assertEquals("python3.4", launcher.builder.conf.get( + package$.MODULE$.PYSPARK_DRIVER_PYTHON().key())); + assertEquals("python3.5", launcher.builder.conf.get(package$.MODULE$.PYSPARK_PYTHON().key())); } @Test(expected=IllegalStateException.class) diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index a883d1b57e..1f0f655a15 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -51,8 +51,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("loading from system properties") { System.setProperty("spark.test.testProperty", "2") + System.setProperty("nonspark.test.testProperty", "0") val conf = new SparkConf() assert(conf.get("spark.test.testProperty") === "2") + assert(!conf.contains("nonspark.test.testProperty")) } test("initializing without loading defaults") { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index b2bc886108..961ece3e00 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark._ import org.apache.spark.api.r.RUtils import org.apache.spark.deploy.SparkSubmit._ import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate +import org.apache.spark.internal.config._ import org.apache.spark.internal.Logging import org.apache.spark.TestUtils.JavaSourceFromString import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -512,6 +513,8 @@ class SparkSubmitSuite val clArgs3 = Seq( "--master", "local", "--py-files", pyFiles, + "--conf", "spark.pyspark.driver.python=python3.4", + "--conf", "spark.pyspark.python=python3.5", "mister.py" ) val appArgs3 = new SparkSubmitArguments(clArgs3) @@ -519,6 +522,8 @@ class SparkSubmitSuite appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles)) sysProps3("spark.submit.pyFiles") should be ( PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(",")) + sysProps3(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4") + sysProps3(PYSPARK_PYTHON.key) should be ("python3.5") } test("resolves config paths correctly") { diff --git a/docs/configuration.md b/docs/configuration.md index e33094b062..ae753189b5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -427,6 +427,21 @@ Apart from these, the following properties are also available, and may be useful with <code>spark.jars.packages</code>. </td> </tr> +<tr> + <td><code>spark.pyspark.driver.python</code></td> + <td></td> + <td> + Python binary executable to use for PySpark in driver. + (default is <code>spark.pyspark.python</code>) + </td> +</tr> +<tr> + <td><code>spark.pyspark.python</code></td> + <td></td> + <td> + Python binary executable to use for PySpark in both driver and executors. + </td> +</tr> </table> #### Shuffle Behavior @@ -1786,11 +1801,13 @@ The following variables can be set in `spark-env.sh`: </tr> <tr> <td><code>PYSPARK_PYTHON</code></td> - <td>Python binary executable to use for PySpark in both driver and workers (default is <code>python2.7</code> if available, otherwise <code>python</code>).</td> + <td>Python binary executable to use for PySpark in both driver and workers (default is <code>python2.7</code> if available, otherwise <code>python</code>). + Property <code>spark.pyspark.python</code> take precedence if it is set</td> </tr> <tr> <td><code>PYSPARK_DRIVER_PYTHON</code></td> - <td>Python binary executable to use for PySpark in driver only (default is <code>PYSPARK_PYTHON</code>).</td> + <td>Python binary executable to use for PySpark in driver only (default is <code>PYSPARK_PYTHON</code>). + Property <code>spark.pyspark.driver.python</code> take precedence if it is set</td> </tr> <tr> <td><code>SPARKR_DRIVER_R</code></td> diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index 41f7f1f3ed..7b7a7bf57b 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -64,6 +64,10 @@ public class SparkLauncher { /** Configuration key for the number of executor CPU cores. */ public static final String EXECUTOR_CORES = "spark.executor.cores"; + static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python"; + + static final String PYSPARK_PYTHON = "spark.pyspark.python"; + /** Logger name to use when launching a child process. */ public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName"; diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index b3ccc4805f..f6da644e4c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -294,11 +294,23 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder { appResource = PYSPARK_SHELL_RESOURCE; constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS"); - // The executable is the PYSPARK_DRIVER_PYTHON env variable set by the pyspark script, - // followed by PYSPARK_DRIVER_PYTHON_OPTS. + // Will pick up the binary executable in the following order + // 1. conf spark.pyspark.driver.python + // 2. conf spark.pyspark.python + // 3. environment variable PYSPARK_DRIVER_PYTHON + // 4. environment variable PYSPARK_PYTHON + // 5. python List<String> pyargs = new ArrayList<>(); - pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), "python")); + pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), + conf.get(SparkLauncher.PYSPARK_PYTHON), + System.getenv("PYSPARK_DRIVER_PYTHON"), + System.getenv("PYSPARK_PYTHON"), + "python")); String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); + if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { + // pass conf spark.pyspark.python to python by environment variable. + env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); + } if (!isEmpty(pyOpts)) { pyargs.addAll(parseOptionString(pyOpts)); } -- GitLab