Skip to content
Snippets Groups Projects
Commit f5e63751 authored by lianhuiwang's avatar lianhuiwang Committed by Andrew Or
Browse files

[SPARK-5173]support python application running on yarn cluster mode

now when we run python application on yarn cluster mode through spark-submit, spark-submit does not support python application on yarn cluster mode. so i modify code of submit and yarn's AM in order to support it.
through specifying .py file or primaryResource file via spark-submit, we can make pyspark run in yarn-cluster mode.
example:spark-submit --master yarn-master --num-executors 1 --driver-memory 1g --executor-memory 1g  xx.py --primaryResource yy.conf
this config is same as pyspark on yarn-client mode.
firstly,we put local path of .py or primaryResource to yarn's dist.files.that can be distributed on slave nodes.and then in spark-submit we transfer --py-files and --primaryResource to yarn.Client and use "org.apache.spark.deploy.PythonRunner" to user class that can run .py files on ApplicationMaster.
in yarn.Client we transfer --py-files and --primaryResource to  ApplicationMaster.
in ApplicationMaster, user's class is org.apache.spark.deploy.PythonRunner, and user's args is primaryResource and -py-files. so that can make pyspark run on ApplicationMaster.
JoshRosen tgravescs sryza

Author: lianhuiwang <lianhuiwang09@gmail.com>
Author: Wang Lianhui <lianhuiwang09@gmail.com>

Closes #3976 from lianhuiwang/SPARK-5173 and squashes the following commits:

28a8a58 [lianhuiwang] fix variable name
67f8cee [lianhuiwang] update with andrewor's comments
0319ae3 [lianhuiwang] address with sryza's comments
2385ef6 [lianhuiwang] address with sryza's comments
03640ab [lianhuiwang] add sparkHome to env
47d2fc3 [lianhuiwang] fix test
2adc8f5 [lianhuiwang] add spark.test.home
d60bc60 [lianhuiwang] fix test
5b30064 [lianhuiwang] add test
097a5ec [lianhuiwang] fix line length exceeds 100
905a106 [lianhuiwang] update with sryza and andrewor 's comments
f1f55b6 [lianhuiwang] when yarn-cluster, all python files can be non-local
172eec1 [Wang Lianhui] fix a min submit's bug
9c941bc [lianhuiwang] support python application running on yarn cluster mode
parent b2047b55
No related branches found
No related tags found
No related merge requests found
......@@ -26,7 +26,7 @@ import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}
/**
* A main class used by spark-submit to launch Python applications. It executes python as a
* A main class used to launch Python applications. It executes python as a
* subprocess and then has it connect back to the JVM to access system properties, etc.
*/
object PythonRunner {
......
......@@ -23,6 +23,8 @@ import java.net.URL
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
import org.apache.hadoop.fs.Path
import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.util.Utils
......@@ -134,12 +136,27 @@ object SparkSubmit {
}
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
// Require all python files to be local, so we can add them to the PYTHONPATH
// In YARN cluster mode, python files are distributed as regular files, which can be non-local
if (args.isPython && !isYarnCluster) {
if (Utils.nonLocalPaths(args.primaryResource).nonEmpty) {
printErrorAndExit(s"Only local python files are supported: $args.primaryResource")
}
val nonLocalPyFiles = Utils.nonLocalPaths(args.pyFiles).mkString(",")
if (nonLocalPyFiles.nonEmpty) {
printErrorAndExit(s"Only local additional python files are supported: $nonLocalPyFiles")
}
}
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
case (_, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
......@@ -150,7 +167,7 @@ object SparkSubmit {
}
// If we're running a python app, set the main class to our specific python runner
if (args.isPython) {
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "py4j.GatewayServer"
args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
......@@ -167,6 +184,13 @@ object SparkSubmit {
}
}
// In yarn-cluster mode for a python app, add primary resource and pyFiles to files
// that can be distributed with the job
if (args.isPython && isYarnCluster) {
args.files = mergeFileLists(args.files, args.primaryResource)
args.files = mergeFileLists(args.files, args.pyFiles)
}
// Special flag to avoid deprecation warnings at the client
sysProps("SPARK_SUBMIT") = "true"
......@@ -245,7 +269,6 @@ object SparkSubmit {
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python files, the primary resource is already distributed as a regular file
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
if (!isYarnCluster && !args.isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
if (isUserJar(args.primaryResource)) {
......@@ -270,10 +293,22 @@ object SparkSubmit {
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
if (args.isPython) {
val mainPyFile = new Path(args.primaryResource).getName
childArgs += ("--primary-py-file", mainPyFile)
if (args.pyFiles != null) {
// These files will be distributed to each machine's working directory, so strip the
// path prefix
val pyFilesNames = args.pyFiles.split(",").map(p => (new Path(p)).getName).mkString(",")
childArgs += ("--py-files", pyFilesNames)
}
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else {
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
......
......@@ -179,18 +179,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script")
}
// Require all python files to be local, so we can add them to the PYTHONPATH
if (isPython) {
if (Utils.nonLocalPaths(primaryResource).nonEmpty) {
SparkSubmit.printErrorAndExit(s"Only local python files are supported: $primaryResource")
}
val nonLocalPyFiles = Utils.nonLocalPaths(pyFiles).mkString(",")
if (nonLocalPyFiles.nonEmpty) {
SparkSubmit.printErrorAndExit(
s"Only local additional python files are supported: $nonLocalPyFiles")
}
}
if (master.startsWith("yarn")) {
val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR")
if (!hasHadoopEnv && !Utils.isTesting) {
......
......@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil}
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.scheduler.cluster.YarnSchedulerBackend
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
......@@ -135,7 +135,7 @@ private[spark] class ApplicationMaster(
.get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
// Call this to force generation of secret so it gets populated into the
// Hadoop UGI. This has to happen before the startUserClass which does a
// Hadoop UGI. This has to happen before the startUserApplication which does a
// doAs in order for the credentials to be passed on to the executor containers.
val securityMgr = new SecurityManager(sparkConf)
......@@ -254,7 +254,7 @@ private[spark] class ApplicationMaster(
private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
userClassThread = startUserClass()
userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
......@@ -448,9 +448,13 @@ private[spark] class ApplicationMaster(
*
* Returns the user thread that was started.
*/
private def startUserClass(): Thread = {
logInfo("Starting the user JAR in a separate Thread")
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
System.setProperty("spark.executor.instances", args.numExecutors.toString)
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
System.setProperty("spark.submit.pyFiles",
PythonRunner.formatPaths(args.pyFiles).mkString(","))
}
val mainMethod = Class.forName(args.userClass, false,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
......
......@@ -24,6 +24,8 @@ import collection.mutable.ArrayBuffer
class ApplicationMasterArguments(val args: Array[String]) {
var userJar: String = null
var userClass: String = null
var primaryPyFile: String = null
var pyFiles: String = null
var userArgs: Seq[String] = Seq[String]()
var executorMemory = 1024
var executorCores = 1
......@@ -48,6 +50,14 @@ class ApplicationMasterArguments(val args: Array[String]) {
userClass = value
args = tail
case ("--primary-py-file") :: value :: tail =>
primaryPyFile = value
args = tail
case ("--py-files") :: value :: tail =>
pyFiles = value
args = tail
case ("--args" | "--arg") :: value :: tail =>
userArgsBuffer += value
args = tail
......@@ -81,6 +91,9 @@ class ApplicationMasterArguments(val args: Array[String]) {
|Options:
| --jar JAR_PATH Path to your application's JAR file
| --class CLASS_NAME Name of your application's main class
| --primary-py-file A main Python file
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
| place on the PYTHONPATH for Python apps.
| --args ARGS Arguments to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
| --num-executors NUM Number of executors to start (Default: 2)
......
......@@ -21,7 +21,7 @@ import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, ListBuffer, Map}
import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map}
import scala.util.{Try, Success, Failure}
import com.google.common.base.Objects
......@@ -477,17 +477,32 @@ private[spark] class Client(
} else {
Nil
}
val primaryPyFile =
if (args.primaryPyFile != null) {
Seq("--primary-py-file", args.primaryPyFile)
} else {
Nil
}
val pyFiles =
if (args.pyFiles != null) {
Seq("--py-files", args.pyFiles)
} else {
Nil
}
val amClass =
if (isClusterMode) {
Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
if (args.primaryPyFile != null && args.primaryPyFile.endsWith(".py")) {
args.userArgs = ArrayBuffer(args.primaryPyFile, args.pyFiles) ++ args.userArgs
}
val userArgs = args.userArgs.flatMap { arg =>
Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg))
}
val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ userArgs ++
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ pyFiles ++ userArgs ++
Seq(
"--executor-memory", args.executorMemory.toString + "m",
"--executor-cores", args.executorCores.toString,
......
......@@ -30,7 +30,9 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
var archives: String = null
var userJar: String = null
var userClass: String = null
var userArgs: Seq[String] = Seq[String]()
var pyFiles: String = null
var primaryPyFile: String = null
var userArgs: ArrayBuffer[String] = new ArrayBuffer[String]()
var executorMemory = 1024 // MB
var executorCores = 1
var numExecutors = DEFAULT_NUMBER_EXECUTORS
......@@ -132,7 +134,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
}
private def parseArgs(inputArgs: List[String]): Unit = {
val userArgsBuffer = new ArrayBuffer[String]()
var args = inputArgs
while (!args.isEmpty) {
......@@ -145,11 +146,15 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
userClass = value
args = tail
case ("--primary-py-file") :: value :: tail =>
primaryPyFile = value
args = tail
case ("--args" | "--arg") :: value :: tail =>
if (args(0) == "--args") {
println("--args is deprecated. Use --arg instead.")
}
userArgsBuffer += value
userArgs += value
args = tail
case ("--master-class" | "--am-class") :: value :: tail =>
......@@ -205,6 +210,10 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
addJars = value
args = tail
case ("--py-files") :: value :: tail =>
pyFiles = value
args = tail
case ("--files") :: value :: tail =>
files = value
args = tail
......@@ -219,8 +228,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
throw new IllegalArgumentException(getUsageMessage(args))
}
}
userArgs = userArgsBuffer.readOnly
}
private def getUsageMessage(unknownParam: List[String] = null): String = {
......@@ -232,6 +239,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster
| mode)
| --class CLASS_NAME Name of your application's main class (required)
| --primary-py-file A main Python file
| --arg ARG Argument to be passed to your application's main class.
| Multiple invocations are possible, each will be passed in order.
| --num-executors NUM Number of executors to start (Default: 2)
......@@ -244,6 +252,8 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| 'default')
| --addJars jars Comma separated list of local jars that want SparkContext.addJar
| to work with.
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to
| place on the PYTHONPATH for Python apps.
| --files files Comma separated list of files to be distributed with the job.
| --archives archives Comma separated list of archives to be distributed with the job.
""".stripMargin
......
......@@ -45,6 +45,29 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
""".stripMargin
private val TEST_PYFILE = """
|import sys
|from operator import add
|
|from pyspark import SparkConf , SparkContext
|if __name__ == "__main__":
| if len(sys.argv) != 3:
| print >> sys.stderr, "Usage: test.py [master] [result file]"
| exit(-1)
| conf = SparkConf()
| conf.setMaster(sys.argv[1]).setAppName("python test in yarn cluster mode")
| sc = SparkContext(conf=conf)
| status = open(sys.argv[2],'w')
| result = "failure"
| rdd = sc.parallelize(range(10))
| cnt = rdd.count()
| if cnt == 10:
| result = "success"
| status.write(result)
| status.close()
| sc.stop()
""".stripMargin
private var yarnCluster: MiniYARNCluster = _
private var tempDir: File = _
private var fakeSparkJar: File = _
......@@ -98,6 +121,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
fakeSparkJar = File.createTempFile("sparkJar", null, tempDir)
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
sys.props += ("spark.yarn.appMasterEnv.SPARK_HOME" -> sparkHome)
sys.props += ("spark.executorEnv.SPARK_HOME" -> sparkHome)
sys.props += ("spark.yarn.jar" -> ("local:" + fakeSparkJar.getAbsolutePath()))
sys.props += ("spark.executor.instances" -> "1")
sys.props += ("spark.driver.extraClassPath" -> childClasspath)
......@@ -146,6 +172,24 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
assert(Utils.exceptionString(exception).contains("Application finished with failed status"))
}
test("run Python application in yarn-cluster mode") {
val primaryPyFile = new File(tempDir, "test.py")
Files.write(TEST_PYFILE, primaryPyFile, Charsets.UTF_8)
val pyFile = new File(tempDir, "test2.py")
Files.write(TEST_PYFILE, pyFile, Charsets.UTF_8)
var result = File.createTempFile("result", null, tempDir)
val args = Array("--class", "org.apache.spark.deploy.PythonRunner",
"--primary-py-file", primaryPyFile.getAbsolutePath(),
"--py-files", pyFile.getAbsolutePath(),
"--arg", "yarn-cluster",
"--arg", result.getAbsolutePath(),
"--name", "python test in yarn-cluster mode",
"--num-executors", "1")
Client.main(args)
checkResult(result)
}
/**
* This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
* any sort of error when the job process finishes successfully, but the job itself fails. So
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment