From 658814c898bec04c31a8e57f8da0103497aac6ec Mon Sep 17 00:00:00 2001 From: Kan Zhang <kzhang@apache.org> Date: Tue, 16 Jun 2015 08:18:26 +0200 Subject: [PATCH] [SPARK-8129] [CORE] [Sec] Pass auth secrets to executors via env variables Env variables are not visible to non-Spark users, based on suggestion from vanzin. Author: Kan Zhang <kzhang@apache.org> Closes #6774 from kanzhang/env and squashes the following commits: 5dd84c6 [Kan Zhang] remove auth secret conf from initial set up for executors 90cb7d2 [Kan Zhang] always filter out auth secret af4d89d [Kan Zhang] minor refactering e88993e [Kan Zhang] pass auth secret to executors via env variable --- .../org/apache/spark/SecurityManager.scala | 17 ++++++-- .../scala/org/apache/spark/SparkConf.scala | 2 +- .../spark/deploy/worker/CommandUtils.scala | 16 ++++++-- .../spark/deploy/worker/DriverRunner.scala | 4 +- .../spark/deploy/worker/ExecutorRunner.scala | 6 +-- .../deploy/worker/CommandUtilsSuite.scala | 39 +++++++++++++++++-- .../deploy/worker/ExecutorRunnerTest.scala | 7 ++-- 7 files changed, 72 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 8aed1e20e0..673ef49e7c 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -192,7 +192,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) // key used to store the spark secret in the Hadoop UGI private val sparkSecretLookupKey = "sparkCookie" - private val authOn = sparkConf.getBoolean("spark.authenticate", false) + private val authOn = sparkConf.getBoolean(SecurityManager.SPARK_AUTH_CONF, false) // keep spark.ui.acls.enable for backwards compatibility with 1.0 private var aclsOn = sparkConf.getBoolean("spark.acls.enable", sparkConf.getBoolean("spark.ui.acls.enable", false)) @@ -365,10 +365,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf) cookie } else { // user must have set spark.authenticate.secret config - sparkConf.getOption("spark.authenticate.secret") match { + // For Master/Worker, auth secret is in conf; for Executors, it is in env variable + sys.env.get(SecurityManager.ENV_AUTH_SECRET) + .orElse(sparkConf.getOption(SecurityManager.SPARK_AUTH_SECRET_CONF)) match { case Some(value) => value case None => throw new Exception("Error: a secret key must be specified via the " + - "spark.authenticate.secret config") + SecurityManager.SPARK_AUTH_SECRET_CONF + " config") } } sCookie @@ -449,3 +451,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf) override def getSaslUser(appId: String): String = getSaslUser() override def getSecretKey(appId: String): String = getSecretKey() } + +private[spark] object SecurityManager { + + val SPARK_AUTH_CONF: String = "spark.authenticate" + val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret" + // This is used to set auth secret to an executor's env variable. It should have the same + // value as SPARK_AUTH_SECERET_CONF set in SparkConf + val ENV_AUTH_SECRET = "_SPARK_AUTH_SECRET" +} diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 46d72841dc..6cf36fbbd6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -557,7 +557,7 @@ private[spark] object SparkConf extends Logging { def isExecutorStartupConf(name: String): Boolean = { isAkkaConf(name) || name.startsWith("spark.akka") || - name.startsWith("spark.auth") || + (name.startsWith("spark.auth") && name != SecurityManager.SPARK_AUTH_SECRET_CONF) || name.startsWith("spark.ssl") || isSparkPortConf(name) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 0a1d60f58b..45a3f43045 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConversions._ import scala.collection.Map import org.apache.spark.Logging +import org.apache.spark.SecurityManager import org.apache.spark.deploy.Command import org.apache.spark.launcher.WorkerCommandBuilder import org.apache.spark.util.Utils @@ -40,12 +41,14 @@ object CommandUtils extends Logging { */ def buildProcessBuilder( command: Command, + securityMgr: SecurityManager, memory: Int, sparkHome: String, substituteArguments: String => String, classPaths: Seq[String] = Seq[String](), env: Map[String, String] = sys.env): ProcessBuilder = { - val localCommand = buildLocalCommand(command, substituteArguments, classPaths, env) + val localCommand = buildLocalCommand( + command, securityMgr, substituteArguments, classPaths, env) val commandSeq = buildCommandSeq(localCommand, memory, sparkHome) val builder = new ProcessBuilder(commandSeq: _*) val environment = builder.environment() @@ -69,6 +72,7 @@ object CommandUtils extends Logging { */ private def buildLocalCommand( command: Command, + securityMgr: SecurityManager, substituteArguments: String => String, classPath: Seq[String] = Seq[String](), env: Map[String, String]): Command = { @@ -76,20 +80,26 @@ object CommandUtils extends Logging { val libraryPathEntries = command.libraryPathEntries val cmdLibraryPath = command.environment.get(libraryPathName) - val newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) { + var newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) { val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName) command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator))) } else { command.environment } + // set auth secret to env variable if needed + if (securityMgr.isAuthenticationEnabled) { + newEnvironment += (SecurityManager.ENV_AUTH_SECRET -> securityMgr.getSecretKey) + } + Command( command.mainClass, command.arguments.map(substituteArguments), newEnvironment, command.classPathEntries ++ classPath, Seq[String](), // library path already captured in environment variable - command.javaOpts) + // filter out auth secret from java options + command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) } /** Spawn a thread that will redirect a given stream to a file */ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index ef7a703bff..1386055eb8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -85,8 +85,8 @@ private[deploy] class DriverRunner( } // TODO: If we add ability to submit multiple jars they should also be added here - val builder = CommandUtils.buildProcessBuilder(driverDesc.command, driverDesc.mem, - sparkHome.getAbsolutePath, substituteVariables) + val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, + driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) launchDriver(builder, driverDir, driverDesc.supervise) } catch { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 7aa85b732f..fff17e1095 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -25,7 +25,7 @@ import akka.actor.ActorRef import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files -import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.{SecurityManager, SparkConf, Logging} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import org.apache.spark.util.Utils @@ -125,8 +125,8 @@ private[deploy] class ExecutorRunner( private def fetchAndRunExecutor() { try { // Launch the process - val builder = CommandUtils.buildProcessBuilder(appDesc.command, memory, - sparkHome.getAbsolutePath, substituteVariables) + val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf), + memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala index 5b3930c0b0..7101cb9978 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/CommandUtilsSuite.scala @@ -17,21 +17,52 @@ package org.apache.spark.deploy.worker -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.Command import org.apache.spark.util.Utils -import org.scalatest.Matchers +import org.scalatest.{Matchers, PrivateMethodTester} -class CommandUtilsSuite extends SparkFunSuite with Matchers { +class CommandUtilsSuite extends SparkFunSuite with Matchers with PrivateMethodTester { test("set libraryPath correctly") { val appId = "12345-worker321-9876" val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val cmd = new Command("mainClass", Seq(), Map(), Seq(), Seq("libraryPathToB"), Seq()) - val builder = CommandUtils.buildProcessBuilder(cmd, 512, sparkHome, t => t) + val builder = CommandUtils.buildProcessBuilder( + cmd, new SecurityManager(new SparkConf), 512, sparkHome, t => t) val libraryPath = Utils.libraryPathEnvName val env = builder.environment env.keySet should contain(libraryPath) assert(env.get(libraryPath).startsWith("libraryPathToB")) } + + test("auth secret shouldn't appear in java opts") { + val buildLocalCommand = PrivateMethod[Command]('buildLocalCommand) + val conf = new SparkConf + val secret = "This is the secret sauce" + // set auth secret + conf.set(SecurityManager.SPARK_AUTH_SECRET_CONF, secret) + val command = new Command("mainClass", Seq(), Map(), Seq(), Seq("lib"), + Seq("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF + "=" + secret)) + + // auth is not set + var cmd = CommandUtils invokePrivate buildLocalCommand( + command, new SecurityManager(conf), (t: String) => t, Seq(), Map()) + assert(!cmd.javaOpts.exists(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) + assert(!cmd.environment.contains(SecurityManager.ENV_AUTH_SECRET)) + + // auth is set to false + conf.set(SecurityManager.SPARK_AUTH_CONF, "false") + cmd = CommandUtils invokePrivate buildLocalCommand( + command, new SecurityManager(conf), (t: String) => t, Seq(), Map()) + assert(!cmd.javaOpts.exists(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) + assert(!cmd.environment.contains(SecurityManager.ENV_AUTH_SECRET)) + + // auth is set to true + conf.set(SecurityManager.SPARK_AUTH_CONF, "true") + cmd = CommandUtils invokePrivate buildLocalCommand( + command, new SecurityManager(conf), (t: String) => t, Seq(), Map()) + assert(!cmd.javaOpts.exists(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) + assert(cmd.environment(SecurityManager.ENV_AUTH_SECRET) === secret) + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 3da9927889..bed6f3ea61 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -22,19 +22,20 @@ import java.io.File import scala.collection.JavaConversions._ import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState} -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} class ExecutorRunnerTest extends SparkFunSuite { test("command includes appId") { val appId = "12345-worker321-9876" + val conf = new SparkConf val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123, - "publicAddr", new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), + "publicAddr", new File(sparkHome), new File("ooga"), "blah", conf, Seq("localDir"), ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder( - appDesc.command, 512, sparkHome, er.substituteVariables) + appDesc.command, new SecurityManager(conf), 512, sparkHome, er.substituteVariables) assert(builder.command().last === appId) } } -- GitLab