Skip to content
Snippets Groups Projects
Commit b14d7b5c authored by KevinGrealish's avatar KevinGrealish Committed by Marcelo Vanzin
Browse files

[SPARK-16110][YARN][PYSPARK] Fix allowing python version to be specified per...

[SPARK-16110][YARN][PYSPARK] Fix allowing python version to be specified per submit for cluster mode.

## What changes were proposed in this pull request?

This fix allows submit of pyspark jobs to specify python 2 or 3.

Change ordering in setup for application master environment so env vars PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON can be overridden by spark.yarn.appMasterEnv.* conf settings. This applies to YARN in cluster mode. This allows them to be set per submission without needing the unset the env vars (which is not always possible - e.g. batch submit with LIVY only exposes the arguments to spark-submit)

## How was this patch tested?
Manual and existing unit tests.

Author: KevinGrealish <KevinGre@microsoft.com>

Closes #13824 from KevinGrealish/SPARK-16110.
parent bc4851ad
No related branches found
No related tags found
No related merge requests found
......@@ -831,8 +831,11 @@ private[spark] class Client(
env("SPARK_JAVA_OPTS") = value
}
// propagate PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON to driver in cluster mode
sys.env.get("PYSPARK_DRIVER_PYTHON").foreach(env("PYSPARK_DRIVER_PYTHON") = _)
sys.env.get("PYSPARK_PYTHON").foreach(env("PYSPARK_PYTHON") = _)
Seq("PYSPARK_DRIVER_PYTHON", "PYSPARK_PYTHON").foreach { envname =>
if (!env.contains(envname)) {
sys.env.get(envname).foreach(env(envname) = _)
}
}
}
sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp =>
......
......@@ -138,6 +138,20 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
testPySpark(false)
}
test("run Python application in yarn-cluster mode using " +
" spark.yarn.appMasterEnv to override local envvar") {
testPySpark(
clientMode = false,
extraConf = Map(
"spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON"
-> sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python"),
"spark.yarn.appMasterEnv.PYSPARK_PYTHON"
-> sys.env.getOrElse("PYSPARK_PYTHON", "python")),
extraEnv = Map(
"PYSPARK_DRIVER_PYTHON" -> "not python",
"PYSPARK_PYTHON" -> "not python"))
}
test("user class path first in client mode") {
testUseClassPathFirst(true)
}
......@@ -207,7 +221,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
checkResult(finalState, executorResult, "ORIGINAL")
}
private def testPySpark(clientMode: Boolean): Unit = {
private def testPySpark(
clientMode: Boolean,
extraConf: Map[String, String] = Map(),
extraEnv: Map[String, String] = Map()): Unit = {
val primaryPyFile = new File(tempDir, "test.py")
Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8)
......@@ -218,9 +235,9 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val pythonPath = Seq(
s"$sparkHome/python/lib/py4j-0.10.1-src.zip",
s"$sparkHome/python")
val extraEnv = Map(
val extraEnvVars = Map(
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
"PYTHONPATH" -> pythonPath.mkString(File.pathSeparator))
"PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv
val moduleDir =
if (clientMode) {
......@@ -242,7 +259,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(),
sparkArgs = Seq("--py-files" -> pyFiles),
appArgs = Seq(result.getAbsolutePath()),
extraEnv = extraEnv)
extraEnv = extraEnvVars,
extraConf = extraConf)
checkResult(finalState, result)
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment