Skip to content
Snippets Groups Projects
Commit a3981c28 authored by Thomas Graves's avatar Thomas Graves Committed by Tom Graves
Browse files

[SPARK-17433] YarnShuffleService doesn't handle moving credentials levelDb

The secrets leveldb isn't being moved if you run spark shuffle services without yarn nm recovery on and then turn it on.  This fixes that.  I unfortunately missed this when I ported the patch from our internal branch 2 to master branch due to the changes for the recovery path.  Note this only applies to master since it is the only place the yarn nm recovery dir is used.

Unit tests ran and tested on 8 node cluster.  Fresh startup with NM recovery, fresh startup no nm recovery, switching between no nm recovery and recovery.  Also tested running applications to make sure wasn't affected by rolling upgrade.

Author: Thomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com>
Author: Tom Graves <tgraves@apache.org>

Closes #14999 from tgravescs/SPARK-17433.
parent 7098a129
No related branches found
No related tags found
No related merge requests found
...@@ -21,6 +21,7 @@ import java.io.File; ...@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -159,8 +160,7 @@ public class YarnShuffleService extends AuxiliaryService { ...@@ -159,8 +160,7 @@ public class YarnShuffleService extends AuxiliaryService {
// If we don't find one, then we choose a file to use to save the state next time. Even if // If we don't find one, then we choose a file to use to save the state next time. Even if
// an application was stopped while the NM was down, we expect yarn to call stopApplication() // an application was stopped while the NM was down, we expect yarn to call stopApplication()
// when it comes back // when it comes back
registeredExecutorFile = registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
new File(getRecoveryPath().toUri().getPath(), RECOVERY_FILE_NAME);
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
...@@ -196,7 +196,7 @@ public class YarnShuffleService extends AuxiliaryService { ...@@ -196,7 +196,7 @@ public class YarnShuffleService extends AuxiliaryService {
private void createSecretManager() throws IOException { private void createSecretManager() throws IOException {
secretManager = new ShuffleSecretManager(); secretManager = new ShuffleSecretManager();
secretsFile = new File(getRecoveryPath().toUri().getPath(), SECRETS_RECOVERY_FILE_NAME); secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME);
// Make sure this is protected in case its not in the NM recovery dir // Make sure this is protected in case its not in the NM recovery dir
FileSystem fs = FileSystem.getLocal(_conf); FileSystem fs = FileSystem.getLocal(_conf);
...@@ -328,37 +328,59 @@ public class YarnShuffleService extends AuxiliaryService { ...@@ -328,37 +328,59 @@ public class YarnShuffleService extends AuxiliaryService {
} }
/** /**
* Get the recovery path, this will override the default one to get our own maintained * Get the path specific to this auxiliary service to use for recovery.
* recovery path. */
protected Path getRecoveryPath(String fileName) {
return _recoveryPath;
}
/**
* Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled
* when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise
* it will uses a YARN local dir.
*/ */
protected Path getRecoveryPath() { protected File initRecoveryDb(String dbFileName) {
if (_recoveryPath != null) {
File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbFileName);
if (recoveryFile.exists()) {
return recoveryFile;
}
}
// db doesn't exist in recovery path go check local dirs for it
String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs");
for (String dir : localDirs) { for (String dir : localDirs) {
File f = new File(new Path(dir).toUri().getPath(), RECOVERY_FILE_NAME); File f = new File(new Path(dir).toUri().getPath(), dbFileName);
if (f.exists()) { if (f.exists()) {
if (_recoveryPath == null) { if (_recoveryPath == null) {
// If NM recovery is not enabled, we should specify the recovery path using NM local // If NM recovery is not enabled, we should specify the recovery path using NM local
// dirs, which is compatible with the old code. // dirs, which is compatible with the old code.
_recoveryPath = new Path(dir); _recoveryPath = new Path(dir);
return f;
} else { } else {
// If NM recovery is enabled and the recovery file exists in old NM local dirs, which // If the recovery path is set then either NM recovery is enabled or another recovery
// means old version of Spark already generated the recovery file, we should copy the // DB has been initialized. If NM recovery is enabled and had set the recovery path
// old file in to a new recovery path for the compatibility. // make sure to move all DBs to the recovery path from the old NM local dirs.
if (!f.renameTo(new File(_recoveryPath.toUri().getPath(), RECOVERY_FILE_NAME))) { // If another DB was initialized first just make sure all the DBs are in the same
// Fail to move recovery file to new path // location.
logger.error("Failed to move recovery file {} to the path {}", File newLoc = new File(_recoveryPath.toUri().getPath(), dbFileName);
RECOVERY_FILE_NAME, _recoveryPath.toString()); if (!newLoc.equals(f)) {
try {
Files.move(f.toPath(), newLoc.toPath());
} catch (Exception e) {
// Fail to move recovery file to new path, just continue on with new DB location
logger.error("Failed to move recovery file {} to the path {}",
dbFileName, _recoveryPath.toString(), e);
}
} }
return newLoc;
} }
break;
} }
} }
if (_recoveryPath == null) { if (_recoveryPath == null) {
_recoveryPath = new Path(localDirs[0]); _recoveryPath = new Path(localDirs[0]);
} }
return _recoveryPath; return new File(_recoveryPath.toUri().getPath(), dbFileName);
} }
/** /**
......
...@@ -267,13 +267,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd ...@@ -267,13 +267,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.stop() s2.stop()
} }
test("moving recovery file form NM local dir to recovery path") { test("moving recovery file from NM local dir to recovery path") {
// This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move // This is to test when Hadoop is upgrade to 2.5+ and NM recovery is enabled, we should move
// old recovery file to the new path to keep compatibility // old recovery file to the new path to keep compatibility
// Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local // Simulate s1 is running on old version of Hadoop in which recovery file is in the NM local
// dir. // dir.
s1 = new YarnShuffleService s1 = new YarnShuffleService
// set auth to true to test the secrets recovery
yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
s1.init(yarnConfig) s1.init(yarnConfig)
val app1Id = ApplicationId.newInstance(0, 1) val app1Id = ApplicationId.newInstance(0, 1)
val app1Data: ApplicationInitializationContext = val app1Data: ApplicationInitializationContext =
...@@ -286,6 +288,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd ...@@ -286,6 +288,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
val execStateFile = s1.registeredExecutorFile val execStateFile = s1.registeredExecutorFile
execStateFile should not be (null) execStateFile should not be (null)
val secretsFile = s1.secretsFile
secretsFile should not be (null)
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
...@@ -312,10 +316,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd ...@@ -312,10 +316,16 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
s2.init(yarnConfig) s2.init(yarnConfig)
val execStateFile2 = s2.registeredExecutorFile val execStateFile2 = s2.registeredExecutorFile
val secretsFile2 = s2.secretsFile
recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString) recoveryPath.toString should be (new Path(execStateFile2.getParentFile.toURI).toString)
recoveryPath.toString should be (new Path(secretsFile2.getParentFile.toURI).toString)
eventually(timeout(10 seconds), interval(5 millis)) { eventually(timeout(10 seconds), interval(5 millis)) {
assert(!execStateFile.exists()) assert(!execStateFile.exists())
} }
eventually(timeout(10 seconds), interval(5 millis)) {
assert(!secretsFile.exists())
}
val handler2 = s2.blockHandler val handler2 = s2.blockHandler
val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2) val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment