Skip to content
Snippets Groups Projects
Commit 7a9e25c3 authored by Jeff Zhang's avatar Jeff Zhang Committed by Marcelo Vanzin
Browse files

[SPARK-13081][PYSPARK][SPARK_SUBMIT] Allow set pythonExec of driver and executor through conf…

Before this PR, user have to export environment variable to specify the python of driver & executor which is not so convenient for users. This PR is trying to allow user to specify python through configuration "--pyspark-driver-python" & "--pyspark-executor-python"

Manually test in local & yarn mode for pyspark-shell and pyspark batch mode.

Author: Jeff Zhang <zjffdu@apache.org>

Closes #13146 from zjffdu/SPARK-13081.
parent ea0bf91b
No related branches found
No related tags found
No related merge requests found
......@@ -24,8 +24,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import scala.util.Try
import org.apache.spark.SparkUserAppException
import org.apache.spark.{SparkConf, SparkUserAppException}
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.internal.config._
import org.apache.spark.util.{RedirectThread, Utils}
/**
......@@ -37,8 +38,12 @@ object PythonRunner {
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
val pythonExec =
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))
val sparkConf = new SparkConf()
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
.orElse(sparkConf.get(PYSPARK_PYTHON))
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
.orElse(sys.env.get("PYSPARK_PYTHON"))
.getOrElse("python")
// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
......@@ -77,6 +82,9 @@ object PythonRunner {
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
// pass conf spark.pyspark.python to python process, the only way to pass info to
// python process is through environment variable.
sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
try {
val process = builder.start()
......
......@@ -106,4 +106,12 @@ package object config {
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
.createOptional
private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python")
.stringConf
.createOptional
private[spark] val PYSPARK_PYTHON = ConfigBuilder("spark.pyspark.python")
.stringConf
.createOptional
}
......@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*;
import org.apache.spark.internal.config.package$;
/**
* These tests require the Spark assembly to be built before they can be run.
*/
......@@ -89,6 +91,12 @@ public class SparkLauncherSuite {
launcher.setConf("spark.foo", "foo");
launcher.addSparkArg(opts.CONF, "spark.foo=bar");
assertEquals("bar", launcher.builder.conf.get("spark.foo"));
launcher.setConf(SparkLauncher.PYSPARK_DRIVER_PYTHON, "python3.4");
launcher.setConf(SparkLauncher.PYSPARK_PYTHON, "python3.5");
assertEquals("python3.4", launcher.builder.conf.get(
package$.MODULE$.PYSPARK_DRIVER_PYTHON().key()));
assertEquals("python3.5", launcher.builder.conf.get(package$.MODULE$.PYSPARK_PYTHON().key()));
}
@Test(expected=IllegalStateException.class)
......
......@@ -51,8 +51,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
test("loading from system properties") {
System.setProperty("spark.test.testProperty", "2")
System.setProperty("nonspark.test.testProperty", "0")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
assert(!conf.contains("nonspark.test.testProperty"))
}
test("initializing without loading defaults") {
......
......@@ -31,6 +31,7 @@ import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.util.{ResetSystemProperties, Utils}
......@@ -512,6 +513,8 @@ class SparkSubmitSuite
val clArgs3 = Seq(
"--master", "local",
"--py-files", pyFiles,
"--conf", "spark.pyspark.driver.python=python3.4",
"--conf", "spark.pyspark.python=python3.5",
"mister.py"
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
......@@ -519,6 +522,8 @@ class SparkSubmitSuite
appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
sysProps3("spark.submit.pyFiles") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
sysProps3(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4")
sysProps3(PYSPARK_PYTHON.key) should be ("python3.5")
}
test("resolves config paths correctly") {
......
......@@ -427,6 +427,21 @@ Apart from these, the following properties are also available, and may be useful
with <code>spark.jars.packages</code>.
</td>
</tr>
<tr>
<td><code>spark.pyspark.driver.python</code></td>
<td></td>
<td>
Python binary executable to use for PySpark in driver.
(default is <code>spark.pyspark.python</code>)
</td>
</tr>
<tr>
<td><code>spark.pyspark.python</code></td>
<td></td>
<td>
Python binary executable to use for PySpark in both driver and executors.
</td>
</tr>
</table>
#### Shuffle Behavior
......@@ -1786,11 +1801,13 @@ The following variables can be set in `spark-env.sh`:
</tr>
<tr>
<td><code>PYSPARK_PYTHON</code></td>
<td>Python binary executable to use for PySpark in both driver and workers (default is <code>python2.7</code> if available, otherwise <code>python</code>).</td>
<td>Python binary executable to use for PySpark in both driver and workers (default is <code>python2.7</code> if available, otherwise <code>python</code>).
Property <code>spark.pyspark.python</code> take precedence if it is set</td>
</tr>
<tr>
<td><code>PYSPARK_DRIVER_PYTHON</code></td>
<td>Python binary executable to use for PySpark in driver only (default is <code>PYSPARK_PYTHON</code>).</td>
<td>Python binary executable to use for PySpark in driver only (default is <code>PYSPARK_PYTHON</code>).
Property <code>spark.pyspark.driver.python</code> take precedence if it is set</td>
</tr>
<tr>
<td><code>SPARKR_DRIVER_R</code></td>
......
......@@ -64,6 +64,10 @@ public class SparkLauncher {
/** Configuration key for the number of executor CPU cores. */
public static final String EXECUTOR_CORES = "spark.executor.cores";
static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python";
static final String PYSPARK_PYTHON = "spark.pyspark.python";
/** Logger name to use when launching a child process. */
public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
......
......@@ -294,11 +294,23 @@ class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
appResource = PYSPARK_SHELL_RESOURCE;
constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");
// The executable is the PYSPARK_DRIVER_PYTHON env variable set by the pyspark script,
// followed by PYSPARK_DRIVER_PYTHON_OPTS.
// Will pick up the binary executable in the following order
// 1. conf spark.pyspark.driver.python
// 2. conf spark.pyspark.python
// 3. environment variable PYSPARK_DRIVER_PYTHON
// 4. environment variable PYSPARK_PYTHON
// 5. python
List<String> pyargs = new ArrayList<>();
pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), "python"));
pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
conf.get(SparkLauncher.PYSPARK_PYTHON),
System.getenv("PYSPARK_DRIVER_PYTHON"),
System.getenv("PYSPARK_PYTHON"),
"python"));
String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
// pass conf spark.pyspark.python to python by environment variable.
env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
}
if (!isEmpty(pyOpts)) {
pyargs.addAll(parseOptionString(pyOpts));
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment