diff --git a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala index a0fb4fe25d188b02f1ac62245eaaa0de86b5b46c..f1c86de4cc96154b4254d2cfee6e07ad46602c2f 100644 --- a/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop1/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,5 +1,6 @@ package spark.deploy import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf /** @@ -20,4 +21,7 @@ object SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems def newConfiguration(): Configuration = new Configuration() + + // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + def addCredentials(conf: JobConf) {} } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala index b96c047e10e7199328a9b82e378437e9d6492b4d..301a57fffa2ca31df37eb7ca5da49a8555e30b6c 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,6 +1,7 @@ package spark.deploy import collection.mutable.HashMap +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.conf.Configuration @@ -49,4 +50,10 @@ object SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems // Always create a new config, dont reuse yarnConf. def newConfiguration(): Configuration = new YarnConfiguration(new Configuration()) + + // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + def addCredentials(conf: JobConf) { + val jobCreds = conf.getCredentials(); + jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + } } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala index f20cc31c7c4e3102d278fecac78f988b690c77c7..514c17f241eba48297ab69a615941fd33a88744f 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/Client.scala @@ -45,7 +45,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) - appContext.setUser(args.amUser) + appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) submitApp(appContext) @@ -141,9 +141,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null) val env = new HashMap[String, String]() - Apps.addToEnvironment(env, Environment.USER.name, args.amUser) - // set this so that UGI set to correct user in unsecure mode - Apps.addToEnvironment(env, "HADOOP_USER_NAME", args.amUser) // If log4j present, ensure ours overrides all others if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") @@ -171,6 +168,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() } + // Add each SPARK-* key to the environment System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } return env @@ -224,7 +222,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } // Command for the ApplicationMaster - val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " + + var javaCommand = "java"; + val javaHome = System.getenv("JAVA_HOME") + if (javaHome != null && !javaHome.isEmpty()) { + javaCommand = Environment.JAVA_HOME.$() + "/bin/java" + } + + val commands = List[String](javaCommand + " -server " + JAVA_OPTS + " spark.deploy.yarn.ApplicationMaster" + diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala index 24110558e7da8333efae3fb25a0f3d375e610ba4..07e7edea36313b4690a932cca56388723332f794 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/ClientArguments.scala @@ -13,7 +13,6 @@ class ClientArguments(val args: Array[String]) { var workerMemory = 1024 var workerCores = 1 var numWorkers = 2 - var amUser = System.getProperty("user.name") var amQueue = System.getProperty("QUEUE", "default") var amMemory: Int = 512 // TODO @@ -58,10 +57,6 @@ class ClientArguments(val args: Array[String]) { workerCores = value args = tail - case ("--user") :: value :: tail => - amUser = value - args = tail - case ("--queue") :: value :: tail => amQueue = value args = tail @@ -96,8 +91,7 @@ class ClientArguments(val args: Array[String]) { " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + - " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + - " --user USERNAME Run the ApplicationMaster (and slaves) as a different user\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" ) System.exit(exitCode) } diff --git a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala index e22d256a8444f87790d231875255fb2fbe5dd176..cc6f3344a1836970cfd55516c96860f93ee95463 100644 --- a/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala +++ b/core/src/hadoop2-yarn/scala/spark/deploy/yarn/WorkerRunnable.scala @@ -85,7 +85,13 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S credentials.writeTokenStorageToStream(dob) ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) - val commands = List[String](Environment.JAVA_HOME.$() + "/bin/java " + + var javaCommand = "java"; + val javaHome = System.getenv("JAVA_HOME") + if (javaHome != null && !javaHome.isEmpty()) { + javaCommand = Environment.JAVA_HOME.$() + "/bin/java" + } + + val commands = List[String](javaCommand + " -server " + // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state. @@ -152,10 +158,6 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - // should we add this ? - Apps.addToEnvironment(env, Environment.USER.name, Utils.getUserNameFromEnvironment()) - // set this so that UGI set to correct user in unsecure mode - Apps.addToEnvironment(env, "HADOOP_USER_NAME", Utils.getUserNameFromEnvironment()) // If log4j present, ensure ours overrides all others if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { diff --git a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala index a0fb4fe25d188b02f1ac62245eaaa0de86b5b46c..f1c86de4cc96154b4254d2cfee6e07ad46602c2f 100644 --- a/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/hadoop2/scala/spark/deploy/SparkHadoopUtil.scala @@ -1,5 +1,6 @@ package spark.deploy import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf /** @@ -20,4 +21,7 @@ object SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems def newConfiguration(): Configuration = new Configuration() + + // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster + def addCredentials(conf: JobConf) {} } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index b397601184924a77c3b69681907cf680bb2efa1f..c9d698fc090ad2a513f2b6ca1a52a9f1140a3b09 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -614,10 +614,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) { - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) conf.setOutputKeyClass(keyClass) conf.setOutputValueClass(valueClass) // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index c3a56938b5c6b6c3c8c7e74ca1c1ebe8077833bc..6c37203707b4218c0195b3196a7c7094b25c8c47 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -295,10 +295,6 @@ class SparkContext( valueClass: Class[V], minSplits: Int = defaultMinSplits ): RDD[(K, V)] = { - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } @@ -311,10 +307,6 @@ class SparkContext( minSplits: Int = defaultMinSplits ) : RDD[(K, V)] = { val conf = new JobConf(hadoopConfiguration) - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) FileInputFormat.setInputPaths(conf, path) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) } diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index cbf5512e247a46773294a428b8f2d6f9d5034876..07c103503c87bb1d82ba4df3c65b03dcd21649d4 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -15,6 +15,7 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils +import spark.deploy.SparkHadoopUtil import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} import spark.util.NextIterator import org.apache.hadoop.conf.Configurable @@ -50,6 +51,7 @@ class HadoopRDD[K, V]( private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) override def getPartitions: Array[Partition] = { + SparkHadoopUtil.addCredentials(conf); val inputFormat = createInputFormat(conf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(conf) diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala index 30a56d7135a9b2bac063beae7deac90c8ed8b6dd..17d0ea4f80f6401299cf153d35f81a95485f0b35 100644 --- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala @@ -9,6 +9,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.conf.Configuration import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.collection.JavaConversions._ +import spark.deploy.SparkHadoopUtil /** @@ -71,10 +72,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = { val conf = new JobConf(configuration) - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + SparkHadoopUtil.addCredentials(conf); FileInputFormat.setInputPaths(conf, path) val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = @@ -94,10 +92,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = { val jobConf = new JobConf(configuration) - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = jobConf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) + SparkHadoopUtil.addCredentials(jobConf); FileInputFormat.setInputPaths(jobConf, path) val instance: org.apache.hadoop.mapred.InputFormat[_, _] = diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 20ee1d3c5dec5320b8cc01cd0356dcbf474f60dd..8d0a83d439996c70ba024f8460b5ca7766241d54 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -472,10 +472,6 @@ extends Serializable { outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf ) { - // make sure to propogate any credentials from the current user to the jobConf - // for Hadoop security - val jobCreds = conf.getCredentials(); - jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) val saveFunc = (rdd: RDD[(K, V)], time: Time) => { val file = rddToFileName(prefix, suffix, time) rdd.saveAsHadoopFile(file, keyClass, valueClass, outputFormatClass, conf)