diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 39b0f4d0e2c0e21fce74107a9b0119dc25be17e8..1e9644d06e1d0a0ddd75db9c5e627f44e36abdf4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -537,16 +537,10 @@ private[spark] class MesosClusterScheduler( .addAllResources(memResourcesToUse.asJava) offer.resources = finalResources.asJava submission.schedulerProperties.get("spark.mesos.executor.docker.image").foreach { image => - val container = taskInfo.getContainerBuilder() - val volumes = submission.schedulerProperties - .get("spark.mesos.executor.docker.volumes") - .map(MesosSchedulerBackendUtil.parseVolumesSpec) - val portmaps = submission.schedulerProperties - .get("spark.mesos.executor.docker.portmaps") - .map(MesosSchedulerBackendUtil.parsePortMappingsSpec) - MesosSchedulerBackendUtil.addDockerInfo( - container, image, volumes = volumes, portmaps = portmaps) - taskInfo.setContainer(container.build()) + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + submission.schedulerProperties.get, + taskInfo.getContainerBuilder()) } val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo]) queuedTasks += taskInfo.build() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 99e6d3958374791a0c6b9a60287e29a78b6482ad..52993caad1aa992143753079bb9ed8a2e76018ec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -408,8 +408,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .addAllResources(memResourcesToUse.asJava) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder) + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + sc.conf.getOption, + taskBuilder.getContainerBuilder + ) } tasks(offer.getId) ::= taskBuilder.build() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index e08dc3b5957bbf8109d665c98f079f682a7fe267..8d4fc9eed7af765fdbdb0c6a044efc2aeefaa4d5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -151,8 +151,11 @@ private[spark] class MesosFineGrainedSchedulerBackend( .setData(ByteString.copyFrom(createExecArg())) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => - MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder()) + MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( + image, + sc.conf.getOption, + executorInfo.getContainerBuilder() + ) } (executorInfo.build(), resourcesAfterMem.asJava) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index 05b2b089440980c12a8ea774edaf2d2b878700b1..aa669f01bd60785e68050b1e82b02d5cc97024b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos import org.apache.mesos.Protos.{ContainerInfo, Volume} import org.apache.mesos.Protos.ContainerInfo.DockerInfo -import org.apache.spark.SparkConf import org.apache.spark.internal.Logging /** @@ -105,11 +104,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { def addDockerInfo( container: ContainerInfo.Builder, image: String, + forcePullImage: Boolean = false, volumes: Option[List[Volume]] = None, network: Option[ContainerInfo.DockerInfo.Network] = None, portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = { - val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image) + val docker = ContainerInfo.DockerInfo.newBuilder() + .setImage(image) + .setForcePullImage(forcePullImage) network.foreach(docker.setNetwork) portmaps.foreach(_.foreach(docker.addPortMappings)) @@ -119,21 +121,23 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { } /** - * Setup a docker containerizer + * Setup a docker containerizer from MesosDriverDescription scheduler properties */ def setupContainerBuilderDockerInfo( imageName: String, - conf: SparkConf, + conf: String => Option[String], builder: ContainerInfo.Builder): Unit = { - val volumes = conf - .getOption("spark.mesos.executor.docker.volumes") + val forcePullImage = conf("spark.mesos.executor.docker.forcePullImage") + .exists(_.equals("true")) + val volumes = conf("spark.mesos.executor.docker.volumes") .map(parseVolumesSpec) - val portmaps = conf - .getOption("spark.mesos.executor.docker.portmaps") + val portmaps = conf("spark.mesos.executor.docker.portmaps") .map(parsePortMappingsSpec) + addDockerInfo( builder, imageName, + forcePullImage = forcePullImage, volumes = volumes, portmaps = portmaps) logDebug("setupContainerDockerInfo: using docker image: " + imageName) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index c2779d7b3565a8689ca2ccddf7b91c80d012ba55..51d262e75ef3adf82c6ca4edd23089dbf43ad59b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -252,6 +252,69 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.start() } + test("docker settings are reflected in created tasks") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image", + "spark.mesos.executor.docker.forcePullImage" -> "true", + "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro", + "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched(driver, "o1") + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val volumes = containerInfo.getVolumesList.asScala + assert(volumes.size == 1) + + val volume = volumes.head + assert(volume.getHostPath == "/host_vol") + assert(volume.getContainerPath == "/container_vol") + assert(volume.getMode == Volume.Mode.RO) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(dockerInfo.getForcePullImage) + + val portMappings = dockerInfo.getPortMappingsList.asScala + assert(portMappings.size == 1) + + val portMapping = portMappings.head + assert(portMapping.getHostPort == 8080) + assert(portMapping.getContainerPort == 80) + assert(portMapping.getProtocol == "tcp") + } + + test("force-pull-image option is disabled by default") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched(driver, "o1") + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(!dockerInfo.getForcePullImage) + } + private def verifyDeclinedOffer(driver: SchedulerDriver, offerId: OfferID, filter: Boolean = false): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 41693b1191a3c10be97e809cd071263e64a67be2..fcf39f63915f7a520e53a65c3570e1a9283ee182 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -150,6 +150,7 @@ class MesosFineGrainedSchedulerBackendSuite val conf = new SparkConf() .set("spark.mesos.executor.docker.image", "spark/mock") + .set("spark.mesos.executor.docker.forcePullImage", "true") .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro") .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp") @@ -169,6 +170,7 @@ class MesosFineGrainedSchedulerBackendSuite val (execInfo, _) = backend.createExecutorInfo( Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor") assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) + assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true)) val portmaps = execInfo.getContainer.getDocker.getPortMappingsList assert(portmaps.get(0).getHostPort.equals(80)) assert(portmaps.get(0).getContainerPort.equals(8080)) diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2 index 5d536b7c245e38f2bdaf40bdb39698859717dfd2..ff1587314030e96592999e15cfd5bbe0796c408b 100644 --- a/dev/deps/spark-deps-hadoop-2.2 +++ b/dev/deps/spark-deps-hadoop-2.2 @@ -116,7 +116,7 @@ libfb303-0.9.2.jar libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3 index d16f42a97d37052389cc2b31b6b7ee0c6162623a..2b5764f8686907e3e1f4692d9c6d94757319f996 100644 --- a/dev/deps/spark-deps-hadoop-2.3 +++ b/dev/deps/spark-deps-hadoop-2.3 @@ -122,7 +122,7 @@ libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar mail-1.4.7.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4 index 2e261cb9a543274d0a574bb41b0ffb02f638a810..3f53fdb09c64acbe194283548778012e663984e0 100644 --- a/dev/deps/spark-deps-hadoop-2.4 +++ b/dev/deps/spark-deps-hadoop-2.4 @@ -122,7 +122,7 @@ libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar mail-1.4.7.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index 67f38f4c220de1b5758abad17a71047faac92fa0..d3a7ab8bb457dcbb99828873f00c7239ef67308c 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -130,7 +130,7 @@ libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar mail-1.4.7.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 07583963d913b76a12a5faf3c9d6a179d17710aa..05317a044d65e40fcbb55075e0cd8c8356ca11fb 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -131,7 +131,7 @@ libthrift-0.9.2.jar log4j-1.2.17.jar lz4-1.3.0.jar mail-1.4.7.jar -mesos-0.21.1-shaded-protobuf.jar +mesos-0.22.2-shaded-protobuf.jar metrics-core-3.1.2.jar metrics-graphite-3.1.2.jar metrics-json-3.1.2.jar diff --git a/docs/_config.yml b/docs/_config.yml index be3d8a2fe6022bde20c47cf21bfb5b7270131143..bbb576e0e7bb3b84b0c7cf1e4d2ae521b7494fdf 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -18,6 +18,6 @@ SPARK_VERSION: 2.1.0-SNAPSHOT SPARK_VERSION_SHORT: 2.1.0 SCALA_BINARY_VERSION: "2.11" SCALA_VERSION: "2.11.7" -MESOS_VERSION: 0.21.0 +MESOS_VERSION: 0.22.0 SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK SPARK_GITHUB_URL: https://github.com/apache/spark diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 10dc9ce890041c584c9cb70c65aac2160a5d1547..ce888b544516e1549f83fd59e90c18e1082acfd5 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -260,6 +260,10 @@ have Mesos download Spark via the usual methods. Requires Mesos version 0.20.1 or later. +Note that by default Mesos agents will not pull the image if it already exists on the agent. If you use mutable image +tags you can set `spark.mesos.executor.docker.forcePullImage` to `true` in order to force the agent to always pull the +image before running the executor. Force pulling images is only available in Mesos version 0.22 and above. + # Running Alongside Hadoop You can run Spark and Mesos alongside your existing Hadoop cluster by just launching them as a @@ -334,6 +338,14 @@ See the [configuration page](configuration.html) for information on Spark config the installed path of the Mesos library can be specified with <code>spark.executorEnv.MESOS_NATIVE_JAVA_LIBRARY</code>. </td> </tr> +<tr> + <td><code>spark.mesos.executor.docker.forcePullImage</code></td> + <td>false</td> + <td> + Force Mesos agents to pull the image specified in <code>spark.mesos.executor.docker.image</code>. + By default Mesos agents will not pull images they already have cached. + </td> +</tr> <tr> <td><code>spark.mesos.executor.docker.volumes</code></td> <td>(none)</td> diff --git a/pom.xml b/pom.xml index d064cb57dd6cf3e9f13cebf8d941af875dae0a03..b69292d1880905e1130ed593ce926aeb2fefc530 100644 --- a/pom.xml +++ b/pom.xml @@ -119,7 +119,7 @@ <java.version>1.7</java.version> <maven.version>3.3.9</maven.version> <sbt.project.name>spark</sbt.project.name> - <mesos.version>0.21.1</mesos.version> + <mesos.version>0.22.2</mesos.version> <mesos.classifier>shaded-protobuf</mesos.classifier> <slf4j.version>1.7.16</slf4j.version> <log4j.version>1.2.17</log4j.version>