diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index cd67eb28573e836141665041ed94766e9dd44e14..d8b2ed6b5dc7b202c4fddaca886bd454f0900a8d 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -160,7 +161,9 @@ public class YarnShuffleService extends AuxiliaryService { // If we don't find one, then we choose a file to use to save the state next time. Even if // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back - registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + if (_recoveryPath != null) { + registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + } TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); @@ -170,7 +173,10 @@ public class YarnShuffleService extends AuxiliaryService { List<TransportServerBootstrap> bootstraps = Lists.newArrayList(); boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); if (authEnabled) { - createSecretManager(); + secretManager = new ShuffleSecretManager(); + if (_recoveryPath != null) { + loadSecretsFromDb(); + } bootstraps.add(new AuthServerBootstrap(transportConf, secretManager)); } @@ -194,13 +200,12 @@ public class YarnShuffleService extends AuxiliaryService { } } - private void createSecretManager() throws IOException { - secretManager = new ShuffleSecretManager(); + private void loadSecretsFromDb() throws IOException { secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME); // Make sure this is protected in case its not in the NM recovery dir FileSystem fs = FileSystem.getLocal(_conf); - fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700)); + fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700)); db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper); logger.info("Recovery location is: " + secretsFile.getPath()); @@ -317,10 +322,10 @@ public class YarnShuffleService extends AuxiliaryService { } /** - * Set the recovery path for shuffle service recovery when NM is restarted. The method will be - * overrode and called when Hadoop version is 2.5+ and NM recovery is enabled, otherwise we - * have to manually call this to set our own recovery path. + * Set the recovery path for shuffle service recovery when NM is restarted. This will be call + * by NM if NM recovery is enabled. */ + @Override public void setRecoveryPath(Path recoveryPath) { _recoveryPath = recoveryPath; } @@ -334,53 +339,44 @@ public class YarnShuffleService extends AuxiliaryService { /** * Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled - * when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise - * it will uses a YARN local dir. + * and DB exists in the local dir of NM by old version of shuffle service. */ protected File initRecoveryDb(String dbName) { - if (_recoveryPath != null) { - File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName); - if (recoveryFile.exists()) { - return recoveryFile; - } + Preconditions.checkNotNull(_recoveryPath, + "recovery path should not be null if NM recovery is enabled"); + + File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName); + if (recoveryFile.exists()) { + return recoveryFile; } + // db doesn't exist in recovery path go check local dirs for it String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); for (String dir : localDirs) { File f = new File(new Path(dir).toUri().getPath(), dbName); if (f.exists()) { - if (_recoveryPath == null) { - // If NM recovery is not enabled, we should specify the recovery path using NM local - // dirs, which is compatible with the old code. - _recoveryPath = new Path(dir); - return f; - } else { - // If the recovery path is set then either NM recovery is enabled or another recovery - // DB has been initialized. If NM recovery is enabled and had set the recovery path - // make sure to move all DBs to the recovery path from the old NM local dirs. - // If another DB was initialized first just make sure all the DBs are in the same - // location. - Path newLoc = new Path(_recoveryPath, dbName); - Path copyFrom = new Path(f.toURI()); - if (!newLoc.equals(copyFrom)) { - logger.info("Moving " + copyFrom + " to: " + newLoc); - try { - // The move here needs to handle moving non-empty directories across NFS mounts - FileSystem fs = FileSystem.getLocal(_conf); - fs.rename(copyFrom, newLoc); - } catch (Exception e) { - // Fail to move recovery file to new path, just continue on with new DB location - logger.error("Failed to move recovery file {} to the path {}", - dbName, _recoveryPath.toString(), e); - } + // If the recovery path is set then either NM recovery is enabled or another recovery + // DB has been initialized. If NM recovery is enabled and had set the recovery path + // make sure to move all DBs to the recovery path from the old NM local dirs. + // If another DB was initialized first just make sure all the DBs are in the same + // location. + Path newLoc = new Path(_recoveryPath, dbName); + Path copyFrom = new Path(f.toURI()); + if (!newLoc.equals(copyFrom)) { + logger.info("Moving " + copyFrom + " to: " + newLoc); + try { + // The move here needs to handle moving non-empty directories across NFS mounts + FileSystem fs = FileSystem.getLocal(_conf); + fs.rename(copyFrom, newLoc); + } catch (Exception e) { + // Fail to move recovery file to new path, just continue on with new DB location + logger.error("Failed to move recovery file {} to the path {}", + dbName, _recoveryPath.toString(), e); } - return new File(newLoc.toUri().getPath()); } + return new File(newLoc.toUri().getPath()); } } - if (_recoveryPath == null) { - _recoveryPath = new Path(localDirs[0]); - } return new File(_recoveryPath.toUri().getPath(), dbName); } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 13472f2ece18489f687757c0cc28cde762aa162a..01db796096f26649cb202d89639f5282f9e29f39 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -70,11 +70,18 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite { val finalState = runSpark( false, mainClassName(YarnExternalShuffleDriver.getClass), - appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath), + appArgs = if (registeredExecFile != null) { + Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath) + } else { + Seq(result.getAbsolutePath) + }, extraConf = extraSparkConf() ) checkResult(finalState, result) - assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists()) + + if (registeredExecFile != null) { + assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists()) + } } } @@ -105,7 +112,7 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 def main(args: Array[String]): Unit = { - if (args.length != 2) { + if (args.length > 2) { // scalastyle:off println System.err.println( s""" @@ -121,10 +128,16 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { .setAppName("External Shuffle Test")) val conf = sc.getConf val status = new File(args(0)) - val registeredExecFile = new File(args(1)) + val registeredExecFile = if (args.length == 2) { + new File(args(1)) + } else { + null + } logInfo("shuffle service executor file = " + registeredExecFile) var result = "failure" - val execStateCopy = new File(registeredExecFile.getAbsolutePath + "_dup") + val execStateCopy = Option(registeredExecFile).map { file => + new File(file.getAbsolutePath + "_dup") + }.orNull try { val data = sc.parallelize(0 until 100, 10).map { x => (x % 10) -> x }.reduceByKey{ _ + _ }. collect().toSet @@ -132,11 +145,15 @@ private object YarnExternalShuffleDriver extends Logging with Matchers { data should be ((0 until 10).map{x => x -> (x * 10 + 450)}.toSet) result = "success" // only one process can open a leveldb file at a time, so we copy the files - FileUtils.copyDirectory(registeredExecFile, execStateCopy) - assert(!ShuffleTestAccessor.reloadRegisteredExecutors(execStateCopy).isEmpty) + if (registeredExecFile != null && execStateCopy != null) { + FileUtils.copyDirectory(registeredExecFile, execStateCopy) + assert(!ShuffleTestAccessor.reloadRegisteredExecutors(execStateCopy).isEmpty) + } } finally { sc.stop() - FileUtils.deleteDirectory(execStateCopy) + if (execStateCopy != null) { + FileUtils.deleteDirectory(execStateCopy) + } Files.write(result, status, StandardCharsets.UTF_8) } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index a58784f59676ac2c432a1fff6a11d31c34198048..268f4bd13f6c30821d332d9b859230d3a924bca2 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -44,6 +44,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd private[yarn] var yarnConfig: YarnConfiguration = null private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" + private var recoveryLocalDir: File = _ + override def beforeEach(): Unit = { super.beforeEach() yarnConfig = new YarnConfiguration() @@ -54,6 +56,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true) val localDir = Utils.createTempDir() yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath) + + recoveryLocalDir = Utils.createTempDir() } var s1: YarnShuffleService = null @@ -81,6 +85,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd test("executor state kept across NM restart") { s1 = new YarnShuffleService + s1.setRecoveryPath(new Path(recoveryLocalDir.toURI)) // set auth to true to test the secrets recovery yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) s1.init(yarnConfig) @@ -123,6 +128,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // now we pretend the shuffle service goes down, and comes back up s1.stop() s2 = new YarnShuffleService + s2.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s2.init(yarnConfig) s2.secretsFile should be (secretsFile) s2.registeredExecutorFile should be (execStateFile) @@ -140,6 +146,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Act like the NM restarts one more time s2.stop() s3 = new YarnShuffleService + s3.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s3.init(yarnConfig) s3.registeredExecutorFile should be (execStateFile) s3.secretsFile should be (secretsFile) @@ -156,6 +163,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd test("removed applications should not be in registered executor file") { s1 = new YarnShuffleService + s1.setRecoveryPath(new Path(recoveryLocalDir.toURI)) yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false) s1.init(yarnConfig) val secretsFile = s1.secretsFile @@ -190,6 +198,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd test("shuffle service should be robust to corrupt registered executor file") { s1 = new YarnShuffleService + s1.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s1.init(yarnConfig) val app1Id = ApplicationId.newInstance(0, 1) val app1Data = makeAppInfo("user", app1Id) @@ -215,6 +224,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd out.close() s2 = new YarnShuffleService + s2.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s2.init(yarnConfig) s2.registeredExecutorFile should be (execStateFile) @@ -234,6 +244,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // another stop & restart should be fine though (eg., we recover from previous corruption) s3 = new YarnShuffleService + s3.setRecoveryPath(new Path(recoveryLocalDir.toURI)) s3.init(yarnConfig) s3.registeredExecutorFile should be (execStateFile) val handler3 = s3.blockHandler @@ -254,14 +265,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s1.init(yarnConfig) s1._recoveryPath should be (recoveryPath) s1.stop() - - // Test recovery path is set inside the shuffle service, this will be happened when NM - // recovery is not enabled or there's no NM recovery (Hadoop 2.5-). - s2 = new YarnShuffleService - s2.init(yarnConfig) - s2._recoveryPath should be - (new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0))) - s2.stop() } test("moving recovery file from NM local dir to recovery path") { @@ -271,6 +274,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local // dir. s1 = new YarnShuffleService + s1.setRecoveryPath(new Path(yarnConfig.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS)(0))) // set auth to true to test the secrets recovery yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) s1.init(yarnConfig) @@ -308,7 +312,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Simulate s2 is running on Hadoop 2.5+ with NM recovery is enabled. assert(execStateFile.exists()) - val recoveryPath = new Path(Utils.createTempDir().toURI) + val recoveryPath = new Path(recoveryLocalDir.toURI) s2 = new YarnShuffleService s2.setRecoveryPath(recoveryPath) s2.init(yarnConfig) @@ -347,10 +351,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd // Set up a read-only local dir. val roDir = Utils.createTempDir() Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE)) - yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath()) // Try to start the shuffle service, it should fail. val service = new YarnShuffleService() + service.setRecoveryPath(new Path(roDir.toURI)) try { val error = intercept[ServiceStateException] { @@ -369,4 +373,12 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd new ApplicationInitializationContext(user, appId, secret) } + test("recovery db should not be created if NM recovery is not enabled") { + s1 = new YarnShuffleService + s1.init(yarnConfig) + s1._recoveryPath should be (null) + s1.registeredExecutorFile should be (null) + s1.secretsFile should be (null) + } + }