Skip to content
Snippets Groups Projects
Commit 725925cf authored by xuan's avatar xuan Committed by Thomas Graves
Browse files

SPARK-1465: Spark compilation is broken with the latest hadoop-2.4.0 release

YARN-1824 changes the APIs (addToEnvironment, setEnvFromInputString) in Apps, which causes the spark build to break if built against a version 2.4.0. To fix this, create the spark own function to do that functionality which will not break compiling against 2.3 and other 2.x versions.

Author: xuan <xuan@MacBook-Pro.local>
Author: xuan <xuan@macbook-pro.home>

Closes #396 from xgong/master and squashes the following commits:

42b5984 [xuan] Remove two extra imports
bc0926f [xuan] Remove usage of org.apache.hadoop.util.Shell
be89fa7 [xuan] fix Spark compilation is broken with the latest hadoop-2.4.0 release
parent e269c24d
No related branches found
No related tags found
No related merge requests found
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.spark.deploy.yarn package org.apache.spark.deploy.yarn
import java.io.File
import java.net.{InetAddress, UnknownHostException, URI} import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer import java.nio.ByteBuffer
...@@ -280,7 +281,8 @@ trait ClientBase extends Logging { ...@@ -280,7 +281,8 @@ trait ClientBase extends Logging {
distCacheMgr.setDistArchivesEnv(env) distCacheMgr.setDistArchivesEnv(env)
// Allow users to specify some environment variables. // Allow users to specify some environment variables.
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
File.pathSeparator)
// Add each SPARK_* key to the environment. // Add each SPARK_* key to the environment.
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
...@@ -382,7 +384,8 @@ object ClientBase { ...@@ -382,7 +384,8 @@ object ClientBase {
YarnConfiguration.YARN_APPLICATION_CLASSPATH)).getOrElse( YarnConfiguration.YARN_APPLICATION_CLASSPATH)).getOrElse(
getDefaultYarnApplicationClasspath()) getDefaultYarnApplicationClasspath())
for (c <- classpathEntries) { for (c <- classpathEntries) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim) YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
File.pathSeparator)
} }
val mrClasspathEntries = Option(conf.getStrings( val mrClasspathEntries = Option(conf.getStrings(
...@@ -390,7 +393,8 @@ object ClientBase { ...@@ -390,7 +393,8 @@ object ClientBase {
getDefaultMRApplicationClasspath()) getDefaultMRApplicationClasspath())
if (mrClasspathEntries != null) { if (mrClasspathEntries != null) {
for (c <- mrClasspathEntries) { for (c <- mrClasspathEntries) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim) YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, c.trim,
File.pathSeparator)
} }
} }
} }
...@@ -425,28 +429,29 @@ object ClientBase { ...@@ -425,28 +429,29 @@ object ClientBase {
} }
def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) { def populateClasspath(conf: Configuration, sparkConf: SparkConf, addLog4j: Boolean, env: HashMap[String, String]) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$(),
File.pathSeparator)
// If log4j present, ensure ours overrides all others // If log4j present, ensure ours overrides all others
if (addLog4j) { if (addLog4j) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + LOG4J_PROP) Path.SEPARATOR + LOG4J_PROP, File.pathSeparator)
} }
// Normally the users app.jar is last in case conflicts with spark jars // Normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false") val userClasspathFirst = sparkConf.get("spark.yarn.user.classpath.first", "false")
.toBoolean .toBoolean
if (userClasspathFirst) { if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR) Path.SEPARATOR + APP_JAR, File.pathSeparator)
} }
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + SPARK_JAR) Path.SEPARATOR + SPARK_JAR, File.pathSeparator)
ClientBase.populateHadoopClasspath(conf, env) ClientBase.populateHadoopClasspath(conf, env)
if (!userClasspathFirst) { if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + APP_JAR) Path.SEPARATOR + APP_JAR, File.pathSeparator)
} }
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*") Path.SEPARATOR + "*", File.pathSeparator)
} }
} }
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.spark.deploy.yarn package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URI import java.net.URI
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction import java.security.PrivilegedExceptionAction
...@@ -167,7 +168,8 @@ trait ExecutorRunnableUtil extends Logging { ...@@ -167,7 +168,8 @@ trait ExecutorRunnableUtil extends Logging {
ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) ClientBase.populateClasspath(yarnConf, sparkConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
// Allow users to specify some environment variables // Allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"),
File.pathSeparator)
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v }
env env
......
...@@ -17,10 +17,16 @@ ...@@ -17,10 +17,16 @@
package org.apache.spark.deploy.yarn package org.apache.spark.deploy.yarn
import java.util.regex.Matcher
import java.util.regex.Pattern
import scala.collection.mutable.HashMap
import org.apache.hadoop.io.Text import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
...@@ -73,4 +79,61 @@ object YarnSparkHadoopUtil { ...@@ -73,4 +79,61 @@ object YarnSparkHadoopUtil {
def getLoggingArgsForContainerCommandLine(): String = { def getLoggingArgsForContainerCommandLine(): String = {
"-Dlog4j.configuration=log4j-spark-container.properties" "-Dlog4j.configuration=log4j-spark-container.properties"
} }
def addToEnvironment(
env: HashMap[String, String],
variable: String,
value: String,
classPathSeparator: String) = {
var envVariable = ""
if (env.get(variable) == None) {
envVariable = value
} else {
envVariable = env.get(variable).get + classPathSeparator + value
}
env put (StringInterner.weakIntern(variable), StringInterner.weakIntern(envVariable))
}
def setEnvFromInputString(
env: HashMap[String, String],
envString: String,
classPathSeparator: String) = {
if (envString != null && envString.length() > 0) {
var childEnvs = envString.split(",")
var p = Pattern.compile(getEnvironmentVariableRegex())
for (cEnv <- childEnvs) {
var parts = cEnv.split("=") // split on '='
var m = p.matcher(parts(1))
val sb = new StringBuffer
while (m.find()) {
val variable = m.group(1)
var replace = ""
if (env.get(variable) != None) {
replace = env.get(variable).get
} else {
// if this key is not configured for the child .. get it
// from the env
replace = System.getenv(variable)
if (replace == null) {
// the env key is note present anywhere .. simply set it
replace = ""
}
}
m.appendReplacement(sb, Matcher.quoteReplacement(replace))
}
m.appendTail(sb)
addToEnvironment(env, parts(0), sb.toString(), classPathSeparator)
}
}
}
private def getEnvironmentVariableRegex() : String = {
val osName = System.getProperty("os.name")
if (osName startsWith "Windows") {
"%([A-Za-z_][A-Za-z0-9_]*?)%"
} else {
"\\$([A-Za-z_][A-Za-z0-9_]*)"
}
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment