diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index df082e4a927470931ec43ef933edb6537c1ce142..43c8df721d5a18931e88dba94743c526835fe6ea 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.ByteBuffer; +import java.nio.file.Files; import java.util.List; import java.util.Map; @@ -159,8 +160,7 @@ public class YarnShuffleService extends AuxiliaryService { // If we don't find one, then we choose a file to use to save the state next time. Even if // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back - registeredExecutorFile = - new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME); + registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); @@ -196,7 +196,7 @@ public class YarnShuffleService extends AuxiliaryService { private void createSecretManager() throws IOException { secretManager = new ShuffleSecretManager(); - secretsFile = new File(getRecoveryPath().toUri().getPath(), SECRETS_RECOVERY_FILE_NAME); + secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME); // Make sure this is protected in case its not in the NM recovery dir FileSystem fs = FileSystem.getLocal(_conf); @@ -328,37 +328,59 @@ public class YarnShuffleService extends AuxiliaryService { } /** - * Get the recovery path, this will override the default one to get our own maintained - * recovery path. + * Get the path specific to this auxiliary service to use for recovery. + */ + protected Path getRecoveryPath(String fileName) { + return _recoveryPath; + } + + /** + * Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled + * when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise + * it will uses a YARN local dir. */ - protected Path getRecoveryPath() { + protected File initRecoveryDb(String dbFileName) { + if (_recoveryPath != null) { + File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName); + if (recoveryFile.exists()) { + return recoveryFile; + } + } + // db doesn't exist in recovery path go check local dirs for it String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); for (String dir : localDirs) { - File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME); + File f = new File(new Path(dir).toUri().getPath(), dbFileName); if (f.exists()) { if (_recoveryPath == null) { // If NM recovery is not enabled, we should specify the recovery path using NM local // dirs, which is compatible with the old code. _recoveryPath = new Path(dir); + return f; } else { - // If NM recovery is enabled and the recovery file exists in old NM local dirs, which - // means old version of Spark already generated the recovery file, we should copy the - // old file in to a new recovery path for the compatibility. - if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) { - // Fail to move recovery file to new path - logger.error("Failed to move recovery file {} to the path {}", - RECOVERY_FILE_NAME, _recoveryPath.toString()); + // If the recovery path is set then either NM recovery is enabled or another recovery + // DB has been initialized. If NM recovery is enabled and had set the recovery path + // make sure to move all DBs to the recovery path from the old NM local dirs. + // If another DB was initialized first just make sure all the DBs are in the same + // location. + File newLoc = new File(_recoveryPath.toUri().getPath(), dbFileName); + if (!newLoc.equals(f)) { + try { + Files.move(f.toPath(), newLoc.toPath()); + } catch (Exception e) { + // Fail to move recovery file to new path, just continue on with new DB location + logger.error("Failed to move recovery file {} to the path {}", + dbFileName, _recoveryPath.toString(), e); + } } + return newLoc; } - break; } } - if (_recoveryPath == null) { _recoveryPath = new Path(localDirs[0]); } - return _recoveryPath; + return new File(_recoveryPath.toUri().getPath(), dbFileName); } /** diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 9a071862bbdb0ec47550033b478f567f46daf180..c86bf7f70c98e0ee95cdc51cd2867453b00238cb 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -267,13 +267,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s2.stop() } - test("moving recovery file form NM local dir to recovery path") { + test("moving recovery file from NM local dir to recovery path") { // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move // old recovery file to the new path to keep compatibility // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local // dir. s1 = new YarnShuffleService + // set auth to true to test the secrets recovery + yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) s1.init(yarnConfig) val app1Id = ApplicationId.newInstance(0, 1) val app1Data: ApplicationInitializationContext = @@ -286,6 +288,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val execStateFile = s1.registeredExecutorFile execStateFile should not be (null) + val secretsFile = s1.secretsFile + secretsFile should not be (null) val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) @@ -312,10 +316,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s2.init(yarnConfig) val execStateFile2 = s2.registeredExecutorFile + val secretsFile2 = s2.secretsFile + recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString) + recoveryPath.toString should be (new Path(secretsFile2.getParentFile.toURI).toString) eventually(timeout(10 seconds), interval(5 millis)) { assert(!execStateFile.exists()) } + eventually(timeout(10 seconds), interval(5 millis)) { + assert(!secretsFile.exists()) + } val handler2 = s2.blockHandler val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)