From e34b4e12673fb76c92f661d7c03527410857a0f8 Mon Sep 17 00:00:00 2001 From: Charles Allen <charles@allen-net.com> Date: Tue, 1 Nov 2016 13:14:17 +0000 Subject: [PATCH] [SPARK-15994][MESOS] Allow enabling Mesos fetch cache in coarse executor backend Mesos 0.23.0 introduces a Fetch Cache feature http://mesos.apache.org/documentation/latest/fetcher/ which allows caching of resources specified in command URIs. This patch: - Updates the Mesos shaded protobuf dependency to 0.23.0 - Allows setting `spark.mesos.fetcherCache.enable` to enable the fetch cache for all specified URIs. (URIs must be specified for the setting to have any affect) - Updates documentation for Mesos configuration with the new setting. This patch does NOT: - Allow for per-URI caching configuration. The cache setting is global to ALL URIs for the command. Author: Charles Allen <charles@allen-net.com> Closes #13713 from drcrallen/SPARK15994. --- docs/running-on-mesos.md | 9 ++++-- .../cluster/mesos/MesosClusterScheduler.scala | 3 +- .../MesosCoarseGrainedSchedulerBackend.scala | 6 ++-- .../cluster/mesos/MesosSchedulerUtils.scala | 6 ++-- ...osCoarseGrainedSchedulerBackendSuite.scala | 28 +++++++++++++++++++ 5 files changed, 45 insertions(+), 7 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 77b06fcf33..923d8dbebf 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -506,8 +506,13 @@ See the [configuration page](configuration.html) for information on Spark config since this configuration is just a upper limit and not a guaranteed amount. </td> </tr> - - +<tr> + <td><code>spark.mesos.fetcherCache.enable</code></td> + <td><code>false</code></td> + <td> + If set to `true`, all URIs (example: `spark.executor.uri`, `spark.mesos.uris`) will be cached by the [Mesos fetcher cache](http://mesos.apache.org/documentation/latest/fetcher/) + </td> +</tr> </table> # Troubleshooting and Debugging diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 0b45499777..635712c00d 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -129,6 +129,7 @@ private[spark] class MesosClusterScheduler( private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200) private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200) private val maxRetryWaitTime = conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute + private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", false) private val schedulerState = engineFactory.createEngine("scheduler") private val stateLock = new Object() private val finishedDrivers = @@ -396,7 +397,7 @@ private[spark] class MesosClusterScheduler( val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:") ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri => - CommandInfo.URI.newBuilder().setValue(uri.trim()).build()) + CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetchCache).build()) } private def getDriverCommandValue(desc: MesosDriverDescription): String = { diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index e67bf3e328..5063c1fe98 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -59,6 +59,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + val useFetcherCache = conf.getBoolean("spark.mesos.fetcherCache.enable", false) + val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) private[this] val shutdownTimeoutMS = @@ -226,10 +228,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( s" --hostname ${offer.getHostname}" + s" --cores $numCores" + s" --app-id $appId") - command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) + command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache)) } - conf.getOption("spark.mesos.uris").foreach(setupUris(_, command)) + conf.getOption("spark.mesos.uris").foreach(setupUris(_, command, useFetcherCache)) command.build() } diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 73cc241239..9cb6023704 100644 --- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -369,9 +369,11 @@ trait MesosSchedulerUtils extends Logging { sc.executorMemory } - def setupUris(uris: String, builder: CommandInfo.Builder): Unit = { + def setupUris(uris: String, + builder: CommandInfo.Builder, + useFetcherCache: Boolean = false): Unit = { uris.split(",").foreach { uri => - builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim())) + builder.addUris(CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetcherCache)) } } diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 75ba02e470..f73638fda6 100644 --- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -463,6 +463,34 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url) } + test("mesos supports setting fetcher cache") { + val url = "spark.spark.spark.com" + setBackend(Map( + "spark.mesos.fetcherCache.enable" -> "true", + "spark.executor.uri" -> url + ), false) + val offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val launchedTasks = verifyTaskLaunched(driver, "o1") + val uris = launchedTasks.head.getCommand.getUrisList + assert(uris.size() == 1) + assert(uris.asScala.head.getCache) + } + + test("mesos supports disabling fetcher cache") { + val url = "spark.spark.spark.com" + setBackend(Map( + "spark.mesos.fetcherCache.enable" -> "false", + "spark.executor.uri" -> url + ), false) + val offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val launchedTasks = verifyTaskLaunched(driver, "o1") + val uris = launchedTasks.head.getCommand.getUrisList + assert(uris.size() == 1) + assert(!uris.asScala.head.getCache) + } + private case class Resources(mem: Int, cpus: Int, gpus: Int = 0) private def verifyDeclinedOffer(driver: SchedulerDriver, -- GitLab