Skip to content
Snippets Groups Projects
Commit 0869b3a5 authored by Philipp Hoffmann's avatar Philipp Hoffmann Committed by Sean Owen
Browse files

[SPARK-15271][MESOS] Allow force pulling executor docker images

## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Mesos agents by default will not pull docker images which are cached
locally already. In order to run Spark executors from mutable tags like
`:latest` this commit introduces a Spark setting
(`spark.mesos.executor.docker.forcePullImage`). Setting this flag to
true will tell the Mesos agent to force pull the docker image (default is `false` which is consistent with the previous
implementation and Mesos' default
behaviour).

Author: Philipp Hoffmann <mail@philipphoffmann.de>

Closes #14348 from philipphoffmann/force-pull-image.
parent a2abb583
No related branches found
No related tags found
No related merge requests found
Showing
with 110 additions and 29 deletions
......@@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler(
.addAllResources(memResourcesToUse.asJava)
offer.resources = finalResources.asJava
submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image =>
val container = taskInfo.getContainerBuilder()
val volumes = submission.schedulerProperties
.get("spark.mesos.executor.docker.volumes")
.map(MesosSchedulerBackendUtil.parseVolumesSpec)
val portmaps = submission.schedulerProperties
.get("spark.mesos.executor.docker.portmaps")
.map(MesosSchedulerBackendUtil.parsePortMappingsSpec)
MesosSchedulerBackendUtil.addDockerInfo(
container, image, volumes = volumes, portmaps = portmaps)
taskInfo.setContainer(container.build())
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
submission.schedulerProperties.get,
taskInfo.getContainerBuilder())
}
val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
queuedTasks += taskInfo.build()
......
......@@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.addAllResources(memResourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder)
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
taskBuilder.getContainerBuilder
)
}
tasks(offer.getId) ::= taskBuilder.build()
......
......@@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend(
.setData(ByteString.copyFrom(createExecArg()))
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
image,
sc.conf.getOption,
executorInfo.getContainerBuilder()
)
}
(executorInfo.build(), resourcesAfterMem.asJava)
......
......@@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos
import org.apache.mesos.Protos.{ContainerInfo, Volume}
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
/**
......@@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
def addDockerInfo(
container: ContainerInfo.Builder,
image: String,
forcePullImage: Boolean = false,
volumes: Option[List[Volume]] = None,
network: Option[ContainerInfo.DockerInfo.Network] = None,
portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = {
val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image)
val docker = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)
network.foreach(docker.setNetwork)
portmaps.foreach(_.foreach(docker.addPortMappings))
......@@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}
/**
* Setup a docker containerizer
* Setup a docker containerizer from MesosDriverDescription scheduler properties
*/
def setupContainerBuilderDockerInfo(
imageName: String,
conf: SparkConf,
conf: String => Option[String],
builder: ContainerInfo.Builder): Unit = {
val volumes = conf
.getOption("spark.mesos.executor.docker.volumes")
val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage")
.exists(_.equals("true"))
val volumes = conf("spark.mesos.executor.docker.volumes")
.map(parseVolumesSpec)
val portmaps = conf
.getOption("spark.mesos.executor.docker.portmaps")
val portmaps = conf("spark.mesos.executor.docker.portmaps")
.map(parsePortMappingsSpec)
addDockerInfo(
builder,
imageName,
forcePullImage = forcePullImage,
volumes = volumes,
portmaps = portmaps)
logDebug("setupContainerDockerInfo: using docker image: " + imageName)
......
......@@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}
test("docker settings are reflected in created tasks") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image",
"spark.mesos.executor.docker.forcePullImage" -> "true",
"spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
"spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
))
val (mem, cpu) = (backend.executorMemory(sc), 4)
val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)
val launchedTasks = verifyTaskLaunched(driver, "o1")
assert(launchedTasks.size == 1)
val containerInfo = launchedTasks.head.getContainer
assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
val volumes = containerInfo.getVolumesList.asScala
assert(volumes.size == 1)
val volume = volumes.head
assert(volume.getHostPath == "/host_vol")
assert(volume.getContainerPath == "/container_vol")
assert(volume.getMode == Volume.Mode.RO)
val dockerInfo = containerInfo.getDocker
assert(dockerInfo.getImage == "some_image")
assert(dockerInfo.getForcePullImage)
val portMappings = dockerInfo.getPortMappingsList.asScala
assert(portMappings.size == 1)
val portMapping = portMappings.head
assert(portMapping.getHostPort == 8080)
assert(portMapping.getContainerPort == 80)
assert(portMapping.getProtocol == "tcp")
}
test("force-pull-image option is disabled by default") {
setBackend(Map(
"spark.mesos.executor.docker.image" -> "some_image"
))
val (mem, cpu) = (backend.executorMemory(sc), 4)
val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)
val launchedTasks = verifyTaskLaunched(driver, "o1")
assert(launchedTasks.size == 1)
val containerInfo = launchedTasks.head.getContainer
assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
val dockerInfo = containerInfo.getDocker
assert(dockerInfo.getImage == "some_image")
assert(!dockerInfo.getForcePullImage)
}
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
......
......@@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite
val conf = new SparkConf()
.set("spark.mesos.executor.docker.image", "spark/mock")
.set("spark.mesos.executor.docker.forcePullImage", "true")
.set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
.set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
......@@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite
val (execInfo, _) = backend.createExecutorInfo(
Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
assert(portmaps.get(0).getHostPort.equals(80))
assert(portmaps.get(0).getContainerPort.equals(8080))
......
......@@ -116,7 +116,7 @@ libfb303-0.9.2.jar
libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
......
......@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
......
......@@ -122,7 +122,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
......
......@@ -130,7 +130,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
......
......@@ -131,7 +131,7 @@ libthrift-0.9.2.jar
log4j-1.2.17.jar
lz4-1.3.0.jar
mail-1.4.7.jar
mesos-0.21.1-shaded-protobuf.jar
mesos-0.22.2-shaded-protobuf.jar
metrics-core-3.1.2.jar
metrics-graphite-3.1.2.jar
metrics-json-3.1.2.jar
......
......@@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT
SPARK_VERSION_SHORT: 2.1.0
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.7"
MESOS_VERSION: 0.21.0
MESOS_VERSION: 0.22.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
......@@ -260,6 +260,10 @@ have Mesos download Spark via the usual methods.
Requires Mesos version 0.20.1 or later.
Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image
tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in order to force the agent to always pull the
image before running the executor. Force pulling images is only available in Mesos version 0.22 and above.
# Running Alongside Hadoop
You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a
......@@ -334,6 +338,14 @@ See the [configuration page](configuration.html) for information on Spark config
the installed path of the Mesos library can be specified with <code>spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY</code>.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.docker.forcePullImage</code></td>
<td>false</td>
<td>
Force Mesos agents to pull the image specified in <code>spark.mesos.executor.docker.image</code>.
By default Mesos agents will not pull images they already have cached.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.docker.volumes</code></td>
<td>(none)</td>
......
......@@ -119,7 +119,7 @@
<java.version>1.7</java.version>
<maven.version>3.3.9</maven.version>
<sbt.project.name>spark</sbt.project.name>
<mesos.version>0.21.1</mesos.version>
<mesos.version>0.22.2</mesos.version>
<mesos.classifier>shaded-protobuf</mesos.classifier>
<slf4j.version>1.7.16</slf4j.version>
<log4j.version>1.2.17</log4j.version>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment