diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index f3621c6beee1ffb20121ce9055bf13e3bd7f7db1..bdc1494cc9da1d2d8efce09b0dd35587505bcd85 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -522,7 +522,7 @@ private object Utils extends Logging { execute(command, new File(".")) } - private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String, + private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String, val firstUserLine: Int, val firstUserClass: String) /** * When called inside a class in the spark package, returns the name of the user code class @@ -610,4 +610,67 @@ private object Utils extends Logging { } return false } + + def isSpace(c: Char): Boolean = { + " \t\r\n".indexOf(c) != -1 + } + + /** + * Split a string of potentially quoted arguments from the command line the way that a shell + * would do it to determine arguments to a command. For example, if the string is 'a "b c" d', + * then it would be parsed as three arguments: 'a', 'b c' and 'd'. + */ + def splitCommandString(s: String): Seq[String] = { + val buf = new ArrayBuffer[String] + var inWord = false + var inSingleQuote = false + var inDoubleQuote = false + var curWord = new StringBuilder + def endWord() { + buf += curWord.toString + curWord.clear() + } + var i = 0 + while (i < s.length) { + var nextChar = s.charAt(i) + if (inDoubleQuote) { + if (nextChar == '"') { + inDoubleQuote = false + } else if (nextChar == '\\') { + if (i < s.length - 1) { + // Append the next character directly, because only " and \ may be escaped in + // double quotes after the shell's own expansion + curWord.append(s.charAt(i + 1)) + i += 1 + } + } else { + curWord.append(nextChar) + } + } else if (inSingleQuote) { + if (nextChar == '\'') { + inSingleQuote = false + } else { + curWord.append(nextChar) + } + // Backslashes are not treated specially in single quotes + } else if (nextChar == '"') { + inWord = true + inDoubleQuote = true + } else if (nextChar == '\'') { + inWord = true + inSingleQuote = true + } else if (!isSpace(nextChar)) { + curWord.append(nextChar) + inWord = true + } else if (inWord && isSpace(nextChar)) { + endWord() + inWord = false + } + i += 1 + } + if (inWord || inDoubleQuote || inSingleQuote) { + endWord() + } + return buf + } } diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 4d31657d9e31835ab129aab2f913ebc4541140dd..db580e39abd357805231c19202156e8dc52947fb 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -40,7 +40,7 @@ private[spark] class ExecutorRunner( workerThread.start() // Shutdown hook that kills actors on shutdown. - shutdownHook = new Thread() { + shutdownHook = new Thread() { override def run() { if (process != null) { logInfo("Shutdown hook killing child process.") @@ -87,25 +87,43 @@ private[spark] class ExecutorRunner( Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++ command.arguments.map(substituteVariables) } - - /* - * Attention: this must always be aligned with the environment variables in the run scripts and the - * way the JAVA_OPTS are assembled there. + + /** + * Attention: this must always be aligned with the environment variables in the run scripts and + * the way the JAVA_OPTS are assembled there. */ def buildJavaOpts(): Seq[String] = { - val _javaLibPath = if (System.getenv("SPARK_LIBRARY_PATH") == null) { - "" + val libraryOpts = if (System.getenv("SPARK_LIBRARY_PATH") == null) { + Nil + } else { + List("-Djava.library.path=" + System.getenv("SPARK_LIBRARY_PATH")) + } + + val userOpts = if (System.getenv("SPARK_JAVA_OPTS") == null) { + Nil } else { - "-Djava.library.path=" + System.getenv("SPARK_LIBRARY_PATH") + Utils.splitCommandString(System.getenv("SPARK_JAVA_OPTS")) } - - Seq("-cp", - System.getenv("CLASSPATH"), - System.getenv("SPARK_JAVA_OPTS"), - _javaLibPath, - "-Xms" + memory.toString + "M", - "-Xmx" + memory.toString + "M") - .filter(_ != null) + + val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M") + + var classPath = System.getenv("CLASSPATH") + if (System.getenv("SPARK_LAUNCH_WITH_SCALA") == "1") { + // Add the Scala library JARs to the classpath; this is needed when the ExecutorRunner + // was launched with "scala" as the runner (e.g. in spark-shell in local-cluster mode) + // and the Scala libraries won't be in the CLASSPATH environment variable by defalt. + if (System.getenv("SCALA_LIBRARY_PATH") == null && System.getenv("SCALA_HOME") == null) { + logError("Cloud not launch executors: neither SCALA_LIBRARY_PATH nor SCALA_HOME are set") + System.exit(1) + } + val scalaLib = Option(System.getenv("SCALA_LIBRARY_PATH")).getOrElse( + System.getenv("SCALA_HOME") + "/lib") + classPath += ":" + scalaLib + "/scala-library.jar" + + ":" + scalaLib + "/scala-compiler.jar" + + ":" + scalaLib + "/jline.jar" + } + + Seq("-cp", classPath) ++ libraryOpts ++ userOpts ++ memoryOpts } /** Spawn a thread that will redirect a given stream to a file */ @@ -136,6 +154,7 @@ private[spark] class ExecutorRunner( // Launch the process val command = buildCommandSeq() + println("COMMAND: " + command.mkString(" ")) val builder = new ProcessBuilder(command: _*).directory(executorDir) val env = builder.environment() for ((key, value) <- appDesc.command.environment) { diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala index ed4701574fd4d9e6aa02959e428a87bbddc5c997..4a113e16bf5a58d72b724760b403bba0f97a92e6 100644 --- a/core/src/test/scala/spark/UtilsSuite.scala +++ b/core/src/test/scala/spark/UtilsSuite.scala @@ -27,24 +27,49 @@ class UtilsSuite extends FunSuite { assert(os.toByteArray.toList.equals(bytes.toList)) } - test("memoryStringToMb"){ - assert(Utils.memoryStringToMb("1") == 0) - assert(Utils.memoryStringToMb("1048575") == 0) - assert(Utils.memoryStringToMb("3145728") == 3) + test("memoryStringToMb") { + assert(Utils.memoryStringToMb("1") === 0) + assert(Utils.memoryStringToMb("1048575") === 0) + assert(Utils.memoryStringToMb("3145728") === 3) - assert(Utils.memoryStringToMb("1024k") == 1) - assert(Utils.memoryStringToMb("5000k") == 4) - assert(Utils.memoryStringToMb("4024k") == Utils.memoryStringToMb("4024K")) + assert(Utils.memoryStringToMb("1024k") === 1) + assert(Utils.memoryStringToMb("5000k") === 4) + assert(Utils.memoryStringToMb("4024k") === Utils.memoryStringToMb("4024K")) - assert(Utils.memoryStringToMb("1024m") == 1024) - assert(Utils.memoryStringToMb("5000m") == 5000) - assert(Utils.memoryStringToMb("4024m") == Utils.memoryStringToMb("4024M")) + assert(Utils.memoryStringToMb("1024m") === 1024) + assert(Utils.memoryStringToMb("5000m") === 5000) + assert(Utils.memoryStringToMb("4024m") === Utils.memoryStringToMb("4024M")) - assert(Utils.memoryStringToMb("2g") == 2048) - assert(Utils.memoryStringToMb("3g") == Utils.memoryStringToMb("3G")) + assert(Utils.memoryStringToMb("2g") === 2048) + assert(Utils.memoryStringToMb("3g") === Utils.memoryStringToMb("3G")) - assert(Utils.memoryStringToMb("2t") == 2097152) - assert(Utils.memoryStringToMb("3t") == Utils.memoryStringToMb("3T")) + assert(Utils.memoryStringToMb("2t") === 2097152) + assert(Utils.memoryStringToMb("3t") === Utils.memoryStringToMb("3T")) + } + + test("splitCommandString") { + assert(Utils.splitCommandString("") === Seq()) + assert(Utils.splitCommandString("a") === Seq("a")) + assert(Utils.splitCommandString("aaa") === Seq("aaa")) + assert(Utils.splitCommandString("a b c") === Seq("a", "b", "c")) + assert(Utils.splitCommandString(" a b\t c ") === Seq("a", "b", "c")) + assert(Utils.splitCommandString("a 'b c'") === Seq("a", "b c")) + assert(Utils.splitCommandString("a 'b c' d") === Seq("a", "b c", "d")) + assert(Utils.splitCommandString("'b c'") === Seq("b c")) + assert(Utils.splitCommandString("a \"b c\"") === Seq("a", "b c")) + assert(Utils.splitCommandString("a \"b c\" d") === Seq("a", "b c", "d")) + assert(Utils.splitCommandString("\"b c\"") === Seq("b c")) + assert(Utils.splitCommandString("a 'b\" c' \"d' e\"") === Seq("a", "b\" c", "d' e")) + assert(Utils.splitCommandString("a\t'b\nc'\nd") === Seq("a", "b\nc", "d")) + assert(Utils.splitCommandString("a \"b\\\\c\"") === Seq("a", "b\\c")) + assert(Utils.splitCommandString("a \"b\\\"c\"") === Seq("a", "b\"c")) + assert(Utils.splitCommandString("a 'b\\\"c'") === Seq("a", "b\\\"c")) + assert(Utils.splitCommandString("'a'b") === Seq("ab")) + assert(Utils.splitCommandString("'a''b'") === Seq("ab")) + assert(Utils.splitCommandString("\"a\"b") === Seq("ab")) + assert(Utils.splitCommandString("\"a\"\"b\"") === Seq("ab")) + assert(Utils.splitCommandString("''") === Seq("")) + assert(Utils.splitCommandString("\"\"") === Seq("")) } }