Skip to content
Snippets Groups Projects
Commit 8aab94d8 authored by Iulian Dragos's avatar Iulian Dragos Committed by Andrew Or
Browse files

[SPARK-4286] Add an external shuffle service that can be run as a daemon.

This allows Mesos deployments to use the shuffle service (and implicitly dynamic allocation). It does so by adding a new "main" class and two corresponding scripts in `sbin`:

- `sbin/start-shuffle-service.sh`
- `sbin/stop-shuffle-service.sh`

Specific options can be passed in `SPARK_SHUFFLE_OPTS`.

This is picking up work from #3861 /cc tnachen

Author: Iulian Dragos <jaguarul@gmail.com>

Closes #4990 from dragos/feature/external-shuffle-service and squashes the following commits:

6c2b148 [Iulian Dragos] Import order and wrong name fixup.
07804ad [Iulian Dragos] Moved ExternalShuffleService to the `deploy` package + other minor tweaks.
4dc1f91 [Iulian Dragos] Reviewer’s comments:
8145429 [Iulian Dragos] Add an external shuffle service that can be run as a daemon.
parent 52ccf1d3
No related branches found
No related tags found
No related merge requests found
......@@ -3,7 +3,7 @@
# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.
# Options read when launching programs locally with
# Options read when launching programs locally with
# ./bin/run-example or ./bin/spark-submit
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
......@@ -39,6 +39,7 @@
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers
......
......@@ -15,7 +15,9 @@
* limitations under the License.
*/
package org.apache.spark.deploy.worker
package org.apache.spark.deploy
import java.util.concurrent.CountDownLatch
import org.apache.spark.{Logging, SparkConf, SecurityManager}
import org.apache.spark.network.TransportContext
......@@ -23,6 +25,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.sasl.SaslRpcHandler
import org.apache.spark.network.server.TransportServer
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
import org.apache.spark.util.Utils
/**
* Provides a server from which Executors can read shuffle files (rather than reading directly from
......@@ -31,8 +34,8 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
*
* Optionally requires SASL authentication in order to read. See [[SecurityManager]].
*/
private[worker]
class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
private[deploy]
class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityManager)
extends Logging {
private val enabled = sparkConf.getBoolean("spark.shuffle.service.enabled", false)
......@@ -51,16 +54,58 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
/** Starts the external shuffle service if the user has configured us to. */
def startIfEnabled() {
if (enabled) {
require(server == null, "Shuffle server already started")
logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
server = transportContext.createServer(port)
start()
}
}
/** Start the external shuffle service */
def start() {
require(server == null, "Shuffle server already started")
logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl")
server = transportContext.createServer(port)
}
def stop() {
if (enabled && server != null) {
if (server != null) {
server.close()
server = null
}
}
}
/**
* A main class for running the external shuffle service.
*/
object ExternalShuffleService extends Logging {
@volatile
private var server: ExternalShuffleService = _
private val barrier = new CountDownLatch(1)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf
Utils.loadDefaultSparkProperties(sparkConf)
val securityManager = new SecurityManager(sparkConf)
// we override this value since this service is started from the command line
// and we assume the user really wants it to be running
sparkConf.set("spark.shuffle.service.enabled", "true")
server = new ExternalShuffleService(sparkConf, securityManager)
server.start()
installShutdownHook()
// keep running until the process is terminated
barrier.await()
}
private def installShutdownHook(): Unit = {
Runtime.getRuntime.addShutdownHook(new Thread("External Shuffle Service shutdown thread") {
override def run() {
logInfo("Shutting down shuffle service.")
server.stop()
barrier.countDown()
}
})
}
}
......@@ -34,6 +34,7 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.{Command, ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.ExternalShuffleService
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
......@@ -61,7 +62,7 @@ private[worker] class Worker(
assert (port > 0)
// For worker and executor IDs
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
......@@ -85,10 +86,10 @@ private[worker] class Worker(
private val CLEANUP_ENABLED = conf.getBoolean("spark.worker.cleanup.enabled", false)
// How often worker will clean up old app folders
private val CLEANUP_INTERVAL_MILLIS =
private val CLEANUP_INTERVAL_MILLIS =
conf.getLong("spark.worker.cleanup.interval", 60 * 30) * 1000
// TTL for app folders/data; after TTL expires it will be cleaned up
private val APP_DATA_RETENTION_SECS =
private val APP_DATA_RETENTION_SECS =
conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
private val testing: Boolean = sys.props.contains("spark.testing")
......@@ -112,7 +113,7 @@ private[worker] class Worker(
} else {
new File(sys.env.get("SPARK_HOME").getOrElse("."))
}
var workDir: File = null
val finishedExecutors = new HashMap[String, ExecutorRunner]
val drivers = new HashMap[String, DriverRunner]
......@@ -122,7 +123,7 @@ private[worker] class Worker(
val finishedApps = new HashSet[String]
// The shuffle service is not actually started unless configured.
private val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
private val shuffleService = new ExternalShuffleService(conf, securityMgr)
private val publicAddress = {
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
......@@ -134,7 +135,7 @@ private[worker] class Worker(
private val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf, securityMgr)
private val workerSource = new WorkerSource(this)
private var registrationRetryTimer: Option[Cancellable] = None
var coresUsed = 0
......
......@@ -32,7 +32,7 @@ Resource allocation can be configured as follows, based on the cluster type:
* **Standalone mode:** By default, applications submitted to the standalone mode cluster will run in
FIFO (first-in-first-out) order, and each application will try to use all available nodes. You can limit
the number of nodes an application uses by setting the `spark.cores.max` configuration property in it,
or change the default for applications that don't set this setting through `spark.deploy.defaultCores`.
or change the default for applications that don't set this setting through `spark.deploy.defaultCores`.
Finally, in addition to controlling cores, each application's `spark.executor.memory` setting controls
its memory use.
* **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`,
......
......@@ -69,6 +69,10 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder {
} else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
memKey = "SPARK_EXECUTOR_MEMORY";
} else if (className.equals("org.apache.spark.deploy.ExternalShuffleService")) {
javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
memKey = "SPARK_DAEMON_MEMORY";
} else if (className.startsWith("org.apache.spark.tools.")) {
String sparkHome = getSparkHome();
File toolsDir = new File(join(File.separator, sparkHome, "tools", "target",
......
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Starts the external shuffle server on the machine this script is executed on.
#
# Usage: start-shuffle-server.sh
#
# Use the SPARK_SHUFFLE_OPTS environment variable to set shuffle server configuration.
#
sbin="`dirname "$0"`"
sbin="`cd "$sbin"; pwd`"
. "$sbin/spark-config.sh"
. "$SPARK_PREFIX/bin/load-spark-env.sh"
exec "$sbin"/spark-daemon.sh start org.apache.spark.deploy.ExternalShuffleService 1
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Stops the external shuffle service on the machine this script is executed on.
sbin="`dirname "$0"`"
sbin="`cd "$sbin"; pwd`"
"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.ExternalShuffleService 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment