Skip to content
Snippets Groups Projects
Commit eca58755 authored by Timothy Chen's avatar Timothy Chen Committed by Sean Owen
Browse files

[SPARK-16927][SPARK-16923] Override task properties at dispatcher.

## What changes were proposed in this pull request?

- enable setting default properties for all jobs submitted through the dispatcher [SPARK-16927]
- remove duplication of conf vars on cluster submitted jobs [SPARK-16923] (this is a small fix, so I'm including in the same PR)

## How was this patch tested?

mesos/spark integration test suite
manual testing

Author: Timothy Chen <tnachen@gmail.com>

Closes #14511 from mgummelt/override-props.
parent bfda53f6
No related branches found
No related tags found
No related merge requests found
......@@ -363,26 +363,21 @@ private[spark] class MesosClusterScheduler(
.orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
}
private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
m.updated(k, f(m.getOrElse(k, default)))
}
private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
s"${frameworkId}-${desc.submissionId}"
}
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
val env = {
val executorOpts = desc.conf.getAll.map { case (k, v) => s"-D$k=$v" }.mkString(" ")
val executorEnv = Map("SPARK_EXECUTOR_OPTS" -> executorOpts)
val driverEnv = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.")
private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => B) = {
m.updated(k, f(m.getOrElse(k, default)))
}
var commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
)
private def getDriverEnvironment(desc: MesosDriverDescription): Environment = {
// TODO(mgummelt): Don't do this here. This should be passed as a --conf
val commandEnv = adjust(desc.command.environment, "SPARK_SUBMIT_OPTS", "")(
v => s"$v -Dspark.mesos.driver.frameworkId=${getDriverFrameworkID(desc)}"
)
driverEnv ++ executorEnv ++ commandEnv
}
val env = desc.conf.getAllWithPrefix("spark.mesos.driverEnv.") ++ commandEnv
val envBuilder = Environment.newBuilder()
env.foreach { case (k, v) =>
......@@ -457,12 +452,6 @@ private[spark] class MesosClusterScheduler(
"--driver-cores", desc.cores.toString,
"--driver-memory", s"${desc.mem}M")
val replicatedOptionsBlacklist = Set(
"spark.jars", // Avoids duplicate classes in classpath
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
)
// Assume empty main class means we're running python
if (!desc.command.mainClass.equals("")) {
options ++= Seq("--class", desc.command.mainClass)
......@@ -480,9 +469,20 @@ private[spark] class MesosClusterScheduler(
.mkString(",")
options ++= Seq("--py-files", formattedFiles)
}
desc.conf.getAll
// --conf
val replicatedOptionsBlacklist = Set(
"spark.jars", // Avoids duplicate classes in classpath
"spark.submit.deployMode", // this would be set to `cluster`, but we need client
"spark.master" // this contains the address of the dispatcher, not master
)
val defaultConf = conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
val driverConf = desc.conf.getAll
.filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
.foreach { case (key, value) => options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
.toMap
(defaultConf ++ driverConf).foreach { case (key, value) =>
options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
options
}
......
......@@ -467,6 +467,17 @@ See the [configuration page](configuration.html) for information on Spark config
Set the Spark Mesos dispatcher webui_url for interacting with the framework.
If unset it will point to Spark's internal web UI.
</td>
</tr>
<tr>
<td><code>spark.mesos.dispatcher.driverDefault.[PropertyName]</code></td>
<td><code>(none)</code></td>
<td>
Set default properties for drivers submitted through the
dispatcher. For example,
spark.mesos.dispatcher.driverProperty.spark.executor.memory=32g
results in the executors for all drivers submitted in cluster mode
to run in 32g containers.
</td>
</tr>
<tr>
<td><code>spark.mesos.dispatcher.historyServer.url</code></td>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment