From 923cf929003c67963e273fcdcd5b01baf68df8b5 Mon Sep 17 00:00:00 2001 From: "Y.CORP.YAHOO.COM\\tgraves" <tgraves@thatenemy-lm.(none)> Date: Tue, 2 Jul 2013 21:18:59 -0500 Subject: [PATCH] Rework from pull request. Removed --user option from Spark on Yarn Client, made the user of JAVA_HOME environment variable conditional on if its set, and created addCredentials in each of the SparkHadoopUtil classes to only add the credentials when the profile is hadoop2-yarn. --- .../scala/spark/deploy/SparkHadoopUtil.scala | 4 ++++ .../scala/spark/deploy/SparkHadoopUtil.scala | 7 +++++++ .../scala/spark/deploy/yarn/Client.scala | 14 +++++++++----- .../scala/spark/deploy/yarn/ClientArguments.scala | 8 +------- .../scala/spark/deploy/yarn/WorkerRunnable.scala | 12 +++++++----- .../scala/spark/deploy/SparkHadoopUtil.scala | 4 ++++ core/src/main/scala/spark/PairRDDFunctions.scala | 4 ---- core/src/main/scala/spark/SparkContext.scala | 8 -------- core/src/main/scala/spark/rdd/HadoopRDD.scala | 2 ++ .../scala/spark/scheduler/InputFormatInfo.scala | 11 +++-------- .../spark/streaming/PairDStreamFunctions.scala | 4 ---- 11 files changed, 37 insertions(+), 41 deletions(-) diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala index a0fb4fe25d..f1c86de4cc 100644 --- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,5 +1,6 @@ package spark.deploy import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf /** @@ -20,4 +21,7 @@ object SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems def newConfiguration(): Configuration = new Configuration() + + // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + def addCredentials(conf: JobConf) {} } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala index b96c047e10..301a57fffa 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,6 +1,7 @@ package spark.deploy import collection.mutable.HashMap +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.conf.Configuration @@ -49,4 +50,10 @@ object SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems // Always create a new config, dont reuse yarnConf. def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) + + // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + def addCredentials(conf: JobConf) { + val jobCreds = conf.getCredentials(); + jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + } } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala index f20cc31c7c..514c17f241 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala @@ -45,7 +45,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) - appContext.setUser(args.amUser) + appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) submitApp(appContext) @@ -141,9 +141,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null) val env = new HashMap[String, String]() - Apps.addToEnvironment(env, Environment.USER.name, args.amUser) - // set this so that UGI set to correct user in unsecure mode - Apps.addToEnvironment(env, "HADOOP_USER_NAME", args.amUser) // If log4j present, ensure ours overrides all others if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") @@ -171,6 +168,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() } + // Add each SPARK-* key to the environment System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } return env @@ -224,7 +222,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } // Command for the ApplicationMaster - val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " + + var javaCommand = "java"; + val javaHome = System.getenv("JAVA_HOME") + if (javaHome != null && !javaHome.isEmpty()) { + javaCommand = Environment.JAVA_HOME.$() + "/bin/java" + } + + val commands = List[String](javaCommand + " -server " + JAVA_OPTS + " spark.deploy.yarn.ApplicationMaster" + diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala index 24110558e7..07e7edea36 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala @@ -13,7 +13,6 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 var workerCores = 1 var numWorkers = 2 - var amUser = System.getProperty("user.name") var amQueue = System.getProperty("QUEUE", "default") var amMemory: Int = 512 // TODO @@ -58,10 +57,6 @@ class ClientArguments(val args: Array[String]) { workerCores = value args = tail - case ("--user") :: value :: tail => - amUser = value - args = tail - case ("--queue") :: value :: tail => amQueue = value args = tail @@ -96,8 +91,7 @@ class ClientArguments(val args: Array[String]) { " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + - " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + - " --user USERNAME Run the ApplicationMaster (and slaves) as a different user\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" ) System.exit(exitCode) } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala index e22d256a84..cc6f3344a1 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala @@ -85,7 +85,13 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S credentials.writeTokenStorageToStream(dob) ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) - val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " + + var javaCommand = "java"; + val javaHome = System.getenv("JAVA_HOME") + if (javaHome != null && !javaHome.isEmpty()) { + javaCommand = Environment.JAVA_HOME.$() + "/bin/java" + } + + val commands = List[String](javaCommand + " -server " + // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state. @@ -152,10 +158,6 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - // should we add this ? - Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment()) - // set this so that UGI set to correct user in unsecure mode - Apps.addToEnvironment(env, "HADOOP_USER_NAME", Utils.getUserNameFromEnvironment()) // If log4j present, ensure ours overrides all others if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala index a0fb4fe25d..f1c86de4cc 100644 --- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,5 +1,6 @@ package spark.deploy import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf /** @@ -20,4 +21,7 @@ object SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems def newConfiguration(): Configuration = new Configuration() + + // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + def addCredentials(conf: JobConf) {} } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index b397601184..c9d698fc09 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -614,10 +614,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) { - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) conf.setOutputKeyClass(keyClass) conf.setOutputValueClass(valueClass) // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index c3a56938b5..6c37203707 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -295,10 +295,6 @@ class SparkContext( valueClass: Class[V], minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } @@ -311,10 +307,6 @@ class SparkContext( minSplits: Int = defaultMinSplits ) : RDD[(K, V)] = { val conf = new JobConf(hadoopConfiguration) - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) FileInputFormat.setInputPaths(conf, path) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index cbf5512e24..07c103503c 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -15,6 +15,7 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils +import spark.deploy.SparkHadoopUtil import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} import spark.util.NextIterator import org.apache.hadoop.conf.Configurable @@ -50,6 +51,7 @@ class HadoopRDD[K, V]( private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) override def getPartitions: Array[Partition] = { + SparkHadoopUtil.addCredentials(conf); val inputFormat = createInputFormat(conf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(conf) diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala index 30a56d7135..17d0ea4f80 100644 --- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala @@ -9,6 +9,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.conf.Configuration import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.collection.JavaConversions._ +import spark.deploy.SparkHadoopUtil /** @@ -71,10 +72,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = { val conf = new JobConf(configuration) - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + SparkHadoopUtil.addCredentials(conf); FileInputFormat.setInputPaths(conf, path) val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = @@ -94,10 +92,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = { val jobConf = new JobConf(configuration) - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = jobConf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + SparkHadoopUtil.addCredentials(jobConf); FileInputFormat.setInputPaths(jobConf, path) val instance: org.apache.hadoop.mapred.InputFormat[_, _] = diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 20ee1d3c5d..8d0a83d439 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -472,10 +472,6 @@ extends Serializable { outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf ) { - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf) -- GitLab