diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index 43c8df721d5a18931e88dba94743c526835fe6ea..ea726e3c8240ea141ad41f1e53a10d1a312c30c7 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -105,7 +105,8 @@ public class YarnShuffleService extends AuxiliaryService { // An entity that manages the shuffle secret per application // This is used only if authentication is enabled - private ShuffleSecretManager secretManager; + @VisibleForTesting + ShuffleSecretManager secretManager; // The actual server that serves shuffle files private TransportServer shuffleServer = null; @@ -197,7 +198,7 @@ public class YarnShuffleService extends AuxiliaryService { private void createSecretManager() throws IOException { secretManager = new ShuffleSecretManager(); secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME); - + // Make sure this is protected in case its not in the NM recovery dir FileSystem fs = FileSystem.getLocal(_conf); fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700)); @@ -306,7 +307,7 @@ public class YarnShuffleService extends AuxiliaryService { } if (db != null) { db.close(); - } + } } catch (Exception e) { logger.error("Exception when stopping service", e); } @@ -329,7 +330,7 @@ public class YarnShuffleService extends AuxiliaryService { /** * Get the path specific to this auxiliary service to use for recovery. - */ + */ protected Path getRecoveryPath(String fileName) { return _recoveryPath; } @@ -345,7 +346,7 @@ public class YarnShuffleService extends AuxiliaryService { if (recoveryFile.exists()) { return recoveryFile; } - } + } // db doesn't exist in recovery path go check local dirs for it String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); for (String dir : localDirs) { diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index c86bf7f70c98e0ee95cdc51cd2867453b00238cb..a58784f59676ac2c432a1fff6a11d31c34198048 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.network.yarn import java.io.{DataOutputStream, File, FileOutputStream, IOException} +import java.nio.ByteBuffer import java.nio.file.Files import java.nio.file.attribute.PosixFilePermission._ import java.util.EnumSet @@ -40,15 +41,17 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.util.Utils class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { - private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration + private[yarn] var yarnConfig: YarnConfiguration = null private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" override def beforeEach(): Unit = { super.beforeEach() + yarnConfig = new YarnConfiguration() yarnConfig.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle") yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), classOf[YarnShuffleService].getCanonicalName) yarnConfig.setInt("spark.shuffle.service.port", 0) + yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true) val localDir = Utils.createTempDir() yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath) } @@ -82,12 +85,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) s1.init(yarnConfig) val app1Id = ApplicationId.newInstance(0, 1) - val app1Data: ApplicationInitializationContext = - new ApplicationInitializationContext("user", app1Id, null) + val app1Data = makeAppInfo("user", app1Id) s1.initializeApplication(app1Data) val app2Id = ApplicationId.newInstance(0, 2) - val app2Data: ApplicationInitializationContext = - new ApplicationInitializationContext("user", app2Id, null) + val app2Data = makeAppInfo("user", app2Id) s1.initializeApplication(app2Data) val execStateFile = s1.registeredExecutorFile @@ -160,12 +161,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val secretsFile = s1.secretsFile secretsFile should be (null) val app1Id = ApplicationId.newInstance(0, 1) - val app1Data: ApplicationInitializationContext = - new ApplicationInitializationContext("user", app1Id, null) + val app1Data = makeAppInfo("user", app1Id) s1.initializeApplication(app1Data) val app2Id = ApplicationId.newInstance(0, 2) - val app2Data: ApplicationInitializationContext = - new ApplicationInitializationContext("user", app2Id, null) + val app2Data = makeAppInfo("user", app2Id) s1.initializeApplication(app2Data) val execStateFile = s1.registeredExecutorFile @@ -193,8 +192,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s1 = new YarnShuffleService s1.init(yarnConfig) val app1Id = ApplicationId.newInstance(0, 1) - val app1Data: ApplicationInitializationContext = - new ApplicationInitializationContext("user", app1Id, null) + val app1Data = makeAppInfo("user", app1Id) s1.initializeApplication(app1Data) val execStateFile = s1.registeredExecutorFile @@ -227,8 +225,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s2.initializeApplication(app1Data) // however, when we initialize a totally new app2, everything is still happy val app2Id = ApplicationId.newInstance(0, 2) - val app2Data: ApplicationInitializationContext = - new ApplicationInitializationContext("user", app2Id, null) + val app2Data = makeAppInfo("user", app2Id) s2.initializeApplication(app2Data) val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2) @@ -278,14 +275,15 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true) s1.init(yarnConfig) val app1Id = ApplicationId.newInstance(0, 1) - val app1Data: ApplicationInitializationContext = - new ApplicationInitializationContext("user", app1Id, null) + val app1Data = makeAppInfo("user", app1Id) s1.initializeApplication(app1Data) val app2Id = ApplicationId.newInstance(0, 2) - val app2Data: ApplicationInitializationContext = - new ApplicationInitializationContext("user", app2Id, null) + val app2Data = makeAppInfo("user", app2Id) s1.initializeApplication(app2Data) + assert(s1.secretManager.getSecretKey(app1Id.toString()) != null) + assert(s1.secretManager.getSecretKey(app2Id.toString()) != null) + val execStateFile = s1.registeredExecutorFile execStateFile should not be (null) val secretsFile = s1.secretsFile @@ -315,6 +313,10 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s2.setRecoveryPath(recoveryPath) s2.init(yarnConfig) + // Ensure that s2 has loaded known apps from the secrets db. + assert(s2.secretManager.getSecretKey(app1Id.toString()) != null) + assert(s2.secretManager.getSecretKey(app2Id.toString()) != null) + val execStateFile2 = s2.registeredExecutorFile val secretsFile2 = s2.secretsFile @@ -342,19 +344,17 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd } test("service throws error if cannot start") { - // Create a different config with a read-only local dir. - val roConfig = new YarnConfiguration(yarnConfig) + // Set up a read-only local dir. val roDir = Utils.createTempDir() Files.setPosixFilePermissions(roDir.toPath(), EnumSet.of(OWNER_READ, OWNER_EXECUTE)) - roConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath()) - roConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true) + yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, roDir.getAbsolutePath()) // Try to start the shuffle service, it should fail. val service = new YarnShuffleService() try { val error = intercept[ServiceStateException] { - service.init(roConfig) + service.init(yarnConfig) } assert(error.getCause().isInstanceOf[IOException]) } finally { @@ -364,4 +364,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd } } + private def makeAppInfo(user: String, appId: ApplicationId): ApplicationInitializationContext = { + val secret = ByteBuffer.wrap(new Array[Byte](0)) + new ApplicationInitializationContext(user, appId, secret) + } + }