Skip to content
Snippets Groups Projects
Commit b087d23e authored by Timothy Chen's avatar Timothy Chen Committed by Andrew Or
Browse files

[SPARK-9669] [MESOS] Support PySpark on Mesos cluster mode.

Support running pyspark with cluster mode on Mesos!
This doesn't upload any scripts, so if running in a remote Mesos requires the user to specify the script from a available URI.

Author: Timothy Chen <tnachen@gmail.com>

Closes #8349 from tnachen/mesos_python.
parent 3339e6f6
No related branches found
No related tags found
No related merge requests found
......@@ -319,9 +319,6 @@ object SparkSubmit {
// The following modes are not supported or applicable
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on Mesos clusters.")
case (MESOS, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on Mesos clusters.")
......@@ -554,7 +551,15 @@ object SparkSubmit {
if (isMesosCluster) {
assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
if (args.isPython) {
// Second argument is main class
childArgs += (args.primaryResource, "")
if (args.pyFiles != null) {
sysProps("spark.submit.pyFiles") = args.pyFiles
}
} else {
childArgs += (args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
......
......@@ -29,7 +29,6 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler, SchedulerDriver}
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
......@@ -375,21 +374,20 @@ private[spark] class MesosClusterScheduler(
val executorOpts = desc.schedulerProperties.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
envBuilder.addVariables(
Variable.newBuilder().setName("SPARK_EXECUTOR_OPTS").setValue(executorOpts))
val cmdOptions = generateCmdOption(desc).mkString(" ")
val dockerDefined = desc.schedulerProperties.contains("spark.mesos.executor.docker.image")
val executorUri = desc.schedulerProperties.get("spark.executor.uri")
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
val appArguments = desc.command.arguments.mkString(" ")
val (executable, jar) = if (dockerDefined) {
// Gets the path to run spark-submit, and the path to the Mesos sandbox.
val (executable, sandboxPath) = if (dockerDefined) {
// Application jar is automatically downloaded in the mounted sandbox by Mesos,
// and the path to the mounted volume is stored in $MESOS_SANDBOX env variable.
("./bin/spark-submit", s"$$MESOS_SANDBOX/${desc.jarUrl.split("/").last}")
("./bin/spark-submit", "$MESOS_SANDBOX")
} else if (executorUri.isDefined) {
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
val folderBasename = executorUri.get.split('/').last.split('.').head
val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
val cmdJar = s"../${desc.jarUrl.split("/").last}"
(cmdExecutable, cmdJar)
// Sandbox path points to the parent folder as we chdir into the folderBasename.
(cmdExecutable, "..")
} else {
val executorSparkHome = desc.schedulerProperties.get("spark.mesos.executor.home")
.orElse(conf.getOption("spark.home"))
......@@ -398,30 +396,50 @@ private[spark] class MesosClusterScheduler(
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
val cmdJar = desc.jarUrl.split("/").last
(cmdExecutable, cmdJar)
// Sandbox points to the current directory by default with Mesos.
(cmdExecutable, ".")
}
builder.setValue(s"$executable $cmdOptions $jar $appArguments")
val primaryResource = new File(sandboxPath, desc.jarUrl.split("/").last).toString()
val cmdOptions = generateCmdOption(desc, sandboxPath).mkString(" ")
val appArguments = desc.command.arguments.mkString(" ")
builder.setValue(s"$executable $cmdOptions $primaryResource $appArguments")
builder.setEnvironment(envBuilder.build())
conf.getOption("spark.mesos.uris").map { uris =>
setupUris(uris, builder)
}
desc.schedulerProperties.get("spark.mesos.uris").map { uris =>
setupUris(uris, builder)
}
desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
setupUris(pyFiles, builder)
}
builder.build()
}
private def generateCmdOption(desc: MesosDriverDescription): Seq[String] = {
private def generateCmdOption(desc: MesosDriverDescription, sandboxPath: String): Seq[String] = {
var options = Seq(
"--name", desc.schedulerProperties("spark.app.name"),
"--class", desc.command.mainClass,
"--master", s"mesos://${conf.get("spark.master")}",
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")
// Assume empty main class means we're running python
if (!desc.command.mainClass.equals("")) {
options ++= Seq("--class", desc.command.mainClass)
}
desc.schedulerProperties.get("spark.executor.memory").map { v =>
options ++= Seq("--executor-memory", v)
}
desc.schedulerProperties.get("spark.cores.max").map { v =>
options ++= Seq("--total-executor-cores", v)
}
desc.schedulerProperties.get("spark.submit.pyFiles").map { pyFiles =>
val formattedFiles = pyFiles.split(",")
.map { path => new File(sandboxPath, path.split("/").last).toString() }
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
options
}
......
......@@ -157,6 +157,8 @@ From the client, you can submit a job to Mesos cluster by running `spark-submit`
to the url of the MesosClusterDispatcher (e.g: mesos://dispatcher:7077). You can view driver statuses on the
Spark cluster Web UI.
Note that jars or python files that are passed to spark-submit should be URIs reachable by Mesos slaves.
# Mesos Run Modes
Spark can run over Mesos in two modes: "fine-grained" (default) and "coarse-grained".
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment