Skip to content
Snippets Groups Projects
Commit 15b00914 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Some fixes to the launch-java-directly change:

- Split SPARK_JAVA_OPTS into multiple command-line arguments if it
  contains spaces; this splitting follows quoting rules in bash
- Add the Scala JARs to the classpath if they're not in the CLASSPATH
  variable because the ExecutorRunner is launched with "scala" (this can
  happen when using local-cluster URLs in spark-shell)
parent 7680ce0b
No related branches found
No related tags found
No related merge requests found
......@@ -522,7 +522,7 @@ private object Utils extends Logging {
execute(command, new File("."))
}
private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
val firstUserLine: Int, val firstUserClass: String)
/**
* When called inside a class in the spark package, returns the name of the user code class
......@@ -610,4 +610,67 @@ private object Utils extends Logging {
}
return false
}
def isSpace(c: Char): Boolean = {
" \t\r\n".indexOf(c) != -1
}
/**
* Split a string of potentially quoted arguments from the command line the way that a shell
* would do it to determine arguments to a command. For example, if the string is 'a "b c" d',
* then it would be parsed as three arguments: 'a', 'b c' and 'd'.
*/
def splitCommandString(s: String): Seq[String] = {
val buf = new ArrayBuffer[String]
var inWord = false
var inSingleQuote = false
var inDoubleQuote = false
var curWord = new StringBuilder
def endWord() {
buf += curWord.toString
curWord.clear()
}
var i = 0
while (i < s.length) {
var nextChar = s.charAt(i)
if (inDoubleQuote) {
if (nextChar == '"') {
inDoubleQuote = false
} else if (nextChar == '\\') {
if (i < s.length - 1) {
// Append the next character directly, because only " and \ may be escaped in
// double quotes after the shell's own expansion
curWord.append(s.charAt(i + 1))
i += 1
}
} else {
curWord.append(nextChar)
}
} else if (inSingleQuote) {
if (nextChar == '\'') {
inSingleQuote = false
} else {
curWord.append(nextChar)
}
// Backslashes are not treated specially in single quotes
} else if (nextChar == '"') {
inWord = true
inDoubleQuote = true
} else if (nextChar == '\'') {
inWord = true
inSingleQuote = true
} else if (!isSpace(nextChar)) {
curWord.append(nextChar)
inWord = true
} else if (inWord && isSpace(nextChar)) {
endWord()
inWord = false
}
i += 1
}
if (inWord || inDoubleQuote || inSingleQuote) {
endWord()
}
return buf
}
}
......@@ -40,7 +40,7 @@ private[spark] class ExecutorRunner(
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
shutdownHook = new Thread() {
override def run() {
if (process != null) {
logInfo("Shutdown hook killing child process.")
......@@ -87,25 +87,43 @@ private[spark] class ExecutorRunner(
Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++
command.arguments.map(substituteVariables)
}
/*
* Attention: this must always be aligned with the environment variables in the run scripts and the
* way the JAVA_OPTS are assembled there.
/**
* Attention: this must always be aligned with the environment variables in the run scripts and
* the way the JAVA_OPTS are assembled there.
*/
def buildJavaOpts(): Seq[String] = {
val _javaLibPath = if (System.getenv("SPARK_LIBRARY_PATH") == null) {
""
val libraryOpts = if (System.getenv("SPARK_LIBRARY_PATH") == null) {
Nil
} else {
List("-Djava.library.path=" + System.getenv("SPARK_LIBRARY_PATH"))
}
val userOpts = if (System.getenv("SPARK_JAVA_OPTS") == null) {
Nil
} else {
"-Djava.library.path=" + System.getenv("SPARK_LIBRARY_PATH")
Utils.splitCommandString(System.getenv("SPARK_JAVA_OPTS"))
}
Seq("-cp",
System.getenv("CLASSPATH"),
System.getenv("SPARK_JAVA_OPTS"),
_javaLibPath,
"-Xms" + memory.toString + "M",
"-Xmx" + memory.toString + "M")
.filter(_ != null)
val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M")
var classPath = System.getenv("CLASSPATH")
if (System.getenv("SPARK_LAUNCH_WITH_SCALA") == "1") {
// Add the Scala library JARs to the classpath; this is needed when the ExecutorRunner
// was launched with "scala" as the runner (e.g. in spark-shell in local-cluster mode)
// and the Scala libraries won't be in the CLASSPATH environment variable by defalt.
if (System.getenv("SCALA_LIBRARY_PATH") == null && System.getenv("SCALA_HOME") == null) {
logError("Cloud not launch executors: neither SCALA_LIBRARY_PATH nor SCALA_HOME are set")
System.exit(1)
}
val scalaLib = Option(System.getenv("SCALA_LIBRARY_PATH")).getOrElse(
System.getenv("SCALA_HOME") + "/lib")
classPath += ":" + scalaLib + "/scala-library.jar" +
":" + scalaLib + "/scala-compiler.jar" +
":" + scalaLib + "/jline.jar"
}
Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts
}
/** Spawn a thread that will redirect a given stream to a file */
......@@ -136,6 +154,7 @@ private[spark] class ExecutorRunner(
// Launch the process
val command = buildCommandSeq()
println("COMMAND: " + command.mkString(" "))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- appDesc.command.environment) {
......
......@@ -27,24 +27,49 @@ class UtilsSuite extends FunSuite {
assert(os.toByteArray.toList.equals(bytes.toList))
}
test("memoryStringToMb"){
assert(Utils.memoryStringToMb("1") == 0)
assert(Utils.memoryStringToMb("1048575") == 0)
assert(Utils.memoryStringToMb("3145728") == 3)
test("memoryStringToMb") {
assert(Utils.memoryStringToMb("1") === 0)
assert(Utils.memoryStringToMb("1048575") === 0)
assert(Utils.memoryStringToMb("3145728") === 3)
assert(Utils.memoryStringToMb("1024k") == 1)
assert(Utils.memoryStringToMb("5000k") == 4)
assert(Utils.memoryStringToMb("4024k") == Utils.memoryStringToMb("4024K"))
assert(Utils.memoryStringToMb("1024k") === 1)
assert(Utils.memoryStringToMb("5000k") === 4)
assert(Utils.memoryStringToMb("4024k") === Utils.memoryStringToMb("4024K"))
assert(Utils.memoryStringToMb("1024m") == 1024)
assert(Utils.memoryStringToMb("5000m") == 5000)
assert(Utils.memoryStringToMb("4024m") == Utils.memoryStringToMb("4024M"))
assert(Utils.memoryStringToMb("1024m") === 1024)
assert(Utils.memoryStringToMb("5000m") === 5000)
assert(Utils.memoryStringToMb("4024m") === Utils.memoryStringToMb("4024M"))
assert(Utils.memoryStringToMb("2g") == 2048)
assert(Utils.memoryStringToMb("3g") == Utils.memoryStringToMb("3G"))
assert(Utils.memoryStringToMb("2g") === 2048)
assert(Utils.memoryStringToMb("3g") === Utils.memoryStringToMb("3G"))
assert(Utils.memoryStringToMb("2t") == 2097152)
assert(Utils.memoryStringToMb("3t") == Utils.memoryStringToMb("3T"))
assert(Utils.memoryStringToMb("2t") === 2097152)
assert(Utils.memoryStringToMb("3t") === Utils.memoryStringToMb("3T"))
}
test("splitCommandString") {
assert(Utils.splitCommandString("") === Seq())
assert(Utils.splitCommandString("a") === Seq("a"))
assert(Utils.splitCommandString("aaa") === Seq("aaa"))
assert(Utils.splitCommandString("a b c") === Seq("a", "b", "c"))
assert(Utils.splitCommandString(" a b\t c ") === Seq("a", "b", "c"))
assert(Utils.splitCommandString("a 'b c'") === Seq("a", "b c"))
assert(Utils.splitCommandString("a 'b c' d") === Seq("a", "b c", "d"))
assert(Utils.splitCommandString("'b c'") === Seq("b c"))
assert(Utils.splitCommandString("a \"b c\"") === Seq("a", "b c"))
assert(Utils.splitCommandString("a \"b c\" d") === Seq("a", "b c", "d"))
assert(Utils.splitCommandString("\"b c\"") === Seq("b c"))
assert(Utils.splitCommandString("a 'b\" c' \"d' e\"") === Seq("a", "b\" c", "d' e"))
assert(Utils.splitCommandString("a\t'b\nc'\nd") === Seq("a", "b\nc", "d"))
assert(Utils.splitCommandString("a \"b\\\\c\"") === Seq("a", "b\\c"))
assert(Utils.splitCommandString("a \"b\\\"c\"") === Seq("a", "b\"c"))
assert(Utils.splitCommandString("a 'b\\\"c'") === Seq("a", "b\\\"c"))
assert(Utils.splitCommandString("'a'b") === Seq("ab"))
assert(Utils.splitCommandString("'a''b'") === Seq("ab"))
assert(Utils.splitCommandString("\"a\"b") === Seq("ab"))
assert(Utils.splitCommandString("\"a\"\"b\"") === Seq("ab"))
assert(Utils.splitCommandString("''") === Seq(""))
assert(Utils.splitCommandString("\"\"") === Seq(""))
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment