diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 8a05628c8fcf2f10ac5aa2b8e54f1ca78dd83caf..df17dacdefaafa269934d949e963e7d787f6b0a6 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -70,6 +70,11 @@ public class YarnShuffleService extends AuxiliaryService { private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb"; + // Whether failure during service initialization should stop the NM. + @VisibleForTesting + static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure"; + private static final boolean DEFAULT_STOP_ON_FAILURE = false; + // An entity that manages the shuffle secret per application // This is used only if authentication is enabled private ShuffleSecretManager secretManager; @@ -119,44 +124,50 @@ public class YarnShuffleService extends AuxiliaryService { * Start the shuffle server with the given configuration. */ @Override - protected void serviceInit(Configuration conf) { + protected void serviceInit(Configuration conf) throws Exception { _conf = conf; - // In case this NM was killed while there were running spark applications, we need to restore - // lost state for the existing executors. We look for an existing file in the NM's local dirs. - // If we don't find one, then we choose a file to use to save the state next time. Even if - // an application was stopped while the NM was down, we expect yarn to call stopApplication() - // when it comes back - registeredExecutorFile = - new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME); - - TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); - // If authentication is enabled, set up the shuffle server to use a - // special RPC handler that filters out unauthenticated fetch requests - boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); + boolean stopOnFailure = conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE); + try { + // In case this NM was killed while there were running spark applications, we need to restore + // lost state for the existing executors. We look for an existing file in the NM's local dirs. + // If we don't find one, then we choose a file to use to save the state next time. Even if + // an application was stopped while the NM was down, we expect yarn to call stopApplication() + // when it comes back + registeredExecutorFile = + new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME); + + TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); - } catch (Exception e) { - logger.error("Failed to initialize external shuffle service", e); - } - List<TransportServerBootstrap> bootstraps = Lists.newArrayList(); - if (authEnabled) { - secretManager = new ShuffleSecretManager(); - bootstraps.add(new SaslServerBootstrap(transportConf, secretManager)); - } + // If authentication is enabled, set up the shuffle server to use a + // special RPC handler that filters out unauthenticated fetch requests + boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); + List<TransportServerBootstrap> bootstraps = Lists.newArrayList(); + if (authEnabled) { + secretManager = new ShuffleSecretManager(); + bootstraps.add(new SaslServerBootstrap(transportConf, secretManager)); + } - int port = conf.getInt( - SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); - TransportContext transportContext = new TransportContext(transportConf, blockHandler); - shuffleServer = transportContext.createServer(port, bootstraps); - // the port should normally be fixed, but for tests its useful to find an open port - port = shuffleServer.getPort(); - boundPort = port; - String authEnabledString = authEnabled ? "enabled" : "not enabled"; - logger.info("Started YARN shuffle service for Spark on port {}. " + - "Authentication is {}. Registered executor file is {}", port, authEnabledString, - registeredExecutorFile); + int port = conf.getInt( + SPARK_SHUFFLE_SERVICE_PORT_KEY, DEFAULT_SPARK_SHUFFLE_SERVICE_PORT); + TransportContext transportContext = new TransportContext(transportConf, blockHandler); + shuffleServer = transportContext.createServer(port, bootstraps); + // the port should normally be fixed, but for tests its useful to find an open port + port = shuffleServer.getPort(); + boundPort = port; + String authEnabledString = authEnabled ? "enabled" : "not enabled"; + logger.info("Started YARN shuffle service for Spark on port {}. " + + "Authentication is {}. Registered executor file is {}", port, authEnabledString, + registeredExecutorFile); + } catch (Exception e) { + if (stopOnFailure) { + throw e; + } else { + noteFailure(e); + } + } } @Override diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md index 40b6cd99cc27f47e9f96a1688a8cd2110ecc7df7..807944f20a78a7d14282418a5b81cfc589b89736 100644 --- a/docs/job-scheduling.md +++ b/docs/job-scheduling.md @@ -83,18 +83,7 @@ In Mesos coarse-grained mode, run `$SPARK_HOME/sbin/start-mesos-shuffle-service. slave nodes with `spark.shuffle.service.enabled` set to `true`. For instance, you may do so through Marathon. -In YARN mode, start the shuffle service on each `NodeManager` as follows: - -1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a -pre-packaged distribution. -2. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under -`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building Spark yourself, and under -`lib` if you are using a distribution. -2. Add this jar to the classpath of all `NodeManager`s in your cluster. -3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`, -then set `yarn.nodemanager.aux-services.spark_shuffle.class` to -`org.apache.spark.network.yarn.YarnShuffleService`. -4. Restart all `NodeManager`s in your cluster. +In YARN mode, follow the instructions [here](running-on-yarn.html#configuring-the-external-shuffle-service). All other relevant configurations are optional and under the `spark.dynamicAllocation.*` and `spark.shuffle.service.*` namespaces. For more detail, see the diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 4e92042da6870cb14a499f12938b4fb636e5f013..befd3eaee9d821c6acbc429cdb4846573f62f0f5 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -539,6 +539,37 @@ launch time. This is done by listing them in the `spark.yarn.access.namenodes` p spark.yarn.access.namenodes hdfs://ireland.example.org:8020/,hdfs://frankfurt.example.org:8020/ ``` +## Configuring the External Shuffle Service + +To start the Spark Shuffle Service on each `NodeManager` in your YARN cluster, follow these +instructions: + +1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a +pre-packaged distribution. +1. Locate the `spark-<version>-yarn-shuffle.jar`. This should be under +`$SPARK_HOME/common/network-yarn/target/scala-<version>` if you are building Spark yourself, and under +`lib` if you are using a distribution. +1. Add this jar to the classpath of all `NodeManager`s in your cluster. +1. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`, +then set `yarn.nodemanager.aux-services.spark_shuffle.class` to +`org.apache.spark.network.yarn.YarnShuffleService`. +1. Restart all `NodeManager`s in your cluster. + +The following extra configuration options are available when the shuffle service is running on YARN: + +<table class="table"> +<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> +<tr> + <td><code>spark.yarn.shuffle.stopOnFailure</code></td> + <td><code>false</code></td> + <td> + Whether to stop the NodeManager when there's a failure in the Spark Shuffle Service's + initialization. This prevents application failures caused by running containers on + NodeManagers where the Spark Shuffle Service is not running. + </td> +</tr> +</table> + ## Launching your application with Apache Oozie Apache Oozie can launch Spark applications as part of a workflow. diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 5458fb9d2e75e291bf6d2b8d7694f61ba9f0a3de..e123e78541048b5f4b5d0f97a5ac1b83de33dd97 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -16,13 +16,17 @@ */ package org.apache.spark.network.yarn -import java.io.{DataOutputStream, File, FileOutputStream} +import java.io.{DataOutputStream, File, FileOutputStream, IOException} +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermission._ +import java.util.EnumSet import scala.annotation.tailrec import scala.concurrent.duration._ import scala.language.postfixOps import org.apache.hadoop.fs.Path +import org.apache.hadoop.service.ServiceStateException import org.apache.hadoop.yarn.api.records.ApplicationId import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext} @@ -45,7 +49,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd classOf[YarnShuffleService].getCanonicalName) yarnConfig.setInt("spark.shuffle.service.port", 0) val localDir = Utils.createTempDir() - yarnConfig.set("yarn.nodemanager.local-dirs", localDir.getAbsolutePath) + yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath) } var s1: YarnShuffleService = null @@ -316,4 +320,28 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s2.stop() } - } + + test("service throws error if cannot start") { + // Create a different config with a read-only local dir. + val roConfig = new YarnConfiguration(yarnConfig) + val roDir = Utils.createTempDir() + Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE)) + roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath()) + roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true) + + // Try to start the shuffle service, it should fail. + val service = new YarnShuffleService() + + try { + val error = intercept[ServiceStateException] { + service.init(roConfig) + } + assert(error.getCause().isInstanceOf[IOException]) + } finally { + service.stop() + Files.setPosixFilePermissions(roDir.toPath(), + EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE)) + } + } + +}