diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index 0c77123740852ce52317c10da80cf7bc8b299065..19861b81c256292544307d645fa7182455123f07 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -45,12 +45,7 @@ public class SparkLauncherSuite { private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class); private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d"); - private SparkLauncher launcher; - - @Before - public void configureLauncher() { - launcher = new SparkLauncher().setSparkHome(System.getProperty("spark.test.home")); - } + private final SparkLauncher launcher = new SparkLauncher(); @Test public void testSparkArgumentHandling() throws Exception { @@ -101,60 +96,6 @@ public class SparkLauncherSuite { assertEquals("python3.5", launcher.builder.conf.get(package$.MODULE$.PYSPARK_PYTHON().key())); } - @Test(expected=IllegalStateException.class) - public void testRedirectTwiceFails() throws Exception { - launcher.setAppResource("fake-resource.jar") - .setMainClass("my.fake.class.Fake") - .redirectError() - .redirectError(ProcessBuilder.Redirect.PIPE) - .launch(); - } - - @Test(expected=IllegalStateException.class) - public void testRedirectToLogWithOthersFails() throws Exception { - launcher.setAppResource("fake-resource.jar") - .setMainClass("my.fake.class.Fake") - .redirectToLog("fakeLog") - .redirectError(ProcessBuilder.Redirect.PIPE) - .launch(); - } - - @Test - public void testRedirectErrorToOutput() throws Exception { - launcher.redirectError(); - assertTrue(launcher.redirectErrorStream); - } - - @Test - public void testRedirectsSimple() throws Exception { - launcher.redirectError(ProcessBuilder.Redirect.PIPE); - assertNotNull(launcher.errorStream); - assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.PIPE); - - launcher.redirectOutput(ProcessBuilder.Redirect.PIPE); - assertNotNull(launcher.outputStream); - assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.PIPE); - } - - @Test - public void testRedirectLastWins() throws Exception { - launcher.redirectError(ProcessBuilder.Redirect.PIPE) - .redirectError(ProcessBuilder.Redirect.INHERIT); - assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.INHERIT); - - launcher.redirectOutput(ProcessBuilder.Redirect.PIPE) - .redirectOutput(ProcessBuilder.Redirect.INHERIT); - assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.INHERIT); - } - - @Test - public void testRedirectToLog() throws Exception { - launcher.redirectToLog("fakeLogger"); - assertTrue(launcher.redirectToLog); - assertTrue(launcher.builder.getEffectiveConfig() - .containsKey(SparkLauncher.CHILD_PROCESS_LOGGER_NAME)); - } - @Test public void testChildProcLauncher() throws Exception { // This test is failed on Windows due to the failure of initiating executors diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 8004247423f3c1b5c1481b6a6eaef7485a1ea6a3..860ab35852331623a1083e47e4cc7f2795a7b9af 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -245,6 +245,9 @@ abstract class AbstractCommandBuilder { String getSparkHome() { String path = getenv(ENV_SPARK_HOME); + if (path == null && "1".equals(getenv("SPARK_TESTING"))) { + path = System.getProperty("spark.test.home"); + } checkState(path != null, "Spark home not found; set it explicitly or use the SPARK_HOME environment variable."); return path; diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 12bf29d3b1aa80a7c03e6d26aa32e5a2771b9f32..3ce4b79ee9c0565fac9d6cf5b8028250029f8275 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -18,6 +18,7 @@ package org.apache.spark.launcher; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.logging.Level; @@ -113,10 +114,12 @@ class ChildProcAppHandle implements SparkAppHandle { return secret; } - void setChildProc(Process childProc, String loggerName) { + void setChildProc(Process childProc, String loggerName, InputStream logStream) { this.childProc = childProc; - this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName, - SparkLauncher.REDIRECTOR_FACTORY); + if (logStream != null) { + this.redirector = new OutputRedirector(logStream, loggerName, + SparkLauncher.REDIRECTOR_FACTORY); + } } void setConnection(LauncherConnection connection) { @@ -146,6 +149,11 @@ class ChildProcAppHandle implements SparkAppHandle { fireEvent(true); } + // Visible for testing. + boolean isRunning() { + return childProc == null || childProc.isAlive() || (redirector != null && redirector.isAlive()); + } + private synchronized void fireEvent(boolean isInfoChanged) { if (listeners != null) { for (Listener l : listeners) { diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index ff8045390c157355e3f2f124dad9fb9b9086ad4d..63abae9a1c49d32541ddae783869cbb61e494b50 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -71,4 +71,8 @@ class OutputRedirector { active = false; } + boolean isAlive() { + return thread.isAlive(); + } + } diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java index ea56214d2390c264ae613878c555377efc8119e7..b83fe1b2d01cbb982e4cb702a24daadc322ad3e3 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java @@ -19,6 +19,7 @@ package org.apache.spark.launcher; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -110,7 +111,6 @@ public class SparkLauncher { // Visible for testing. final SparkSubmitCommandBuilder builder; File workingDir; - boolean redirectToLog; boolean redirectErrorStream; ProcessBuilder.Redirect errorStream; ProcessBuilder.Redirect outputStream; @@ -446,7 +446,6 @@ public class SparkLauncher { */ public SparkLauncher redirectToLog(String loggerName) { setConf(CHILD_PROCESS_LOGGER_NAME, loggerName); - redirectToLog = true; return this; } @@ -459,11 +458,22 @@ public class SparkLauncher { * @return A process handle for the Spark app. */ public Process launch() throws IOException { - Process childProc = createBuilder().start(); - if (redirectToLog) { - String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); - new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY); + ProcessBuilder pb = createBuilder(); + + boolean outputToLog = outputStream == null; + boolean errorToLog = !redirectErrorStream && errorStream == null; + + String loggerName = getLoggerName(); + if (loggerName != null && outputToLog && errorToLog) { + pb.redirectErrorStream(true); + } + + Process childProc = pb.start(); + if (loggerName != null) { + InputStream logStream = outputToLog ? childProc.getInputStream() : childProc.getErrorStream(); + new OutputRedirector(logStream, loggerName, REDIRECTOR_FACTORY); } + return childProc; } @@ -498,30 +508,35 @@ public class SparkLauncher { handle.addListener(l); } - String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); + String loggerName = getLoggerName(); ProcessBuilder pb = createBuilder(); + + boolean outputToLog = outputStream == null; + boolean errorToLog = !redirectErrorStream && errorStream == null; + // Only setup stderr + stdout to logger redirection if user has not otherwise configured output // redirection. - if (loggerName == null) { - String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); - if (appName == null) { - if (builder.appName != null) { - appName = builder.appName; - } else if (builder.mainClass != null) { - int dot = builder.mainClass.lastIndexOf("."); - if (dot >= 0 && dot < builder.mainClass.length() - 1) { - appName = builder.mainClass.substring(dot + 1, builder.mainClass.length()); - } else { - appName = builder.mainClass; - } - } else if (builder.appResource != null) { - appName = new File(builder.appResource).getName(); + if (loggerName == null && (outputToLog || errorToLog)) { + String appName; + if (builder.appName != null) { + appName = builder.appName; + } else if (builder.mainClass != null) { + int dot = builder.mainClass.lastIndexOf("."); + if (dot >= 0 && dot < builder.mainClass.length() - 1) { + appName = builder.mainClass.substring(dot + 1, builder.mainClass.length()); } else { - appName = String.valueOf(COUNTER.incrementAndGet()); + appName = builder.mainClass; } + } else if (builder.appResource != null) { + appName = new File(builder.appResource).getName(); + } else { + appName = String.valueOf(COUNTER.incrementAndGet()); } String loggerPrefix = getClass().getPackage().getName(); loggerName = String.format("%s.app.%s", loggerPrefix, appName); + } + + if (outputToLog && errorToLog) { pb.redirectErrorStream(true); } @@ -529,7 +544,12 @@ public class SparkLauncher { String.valueOf(LauncherServer.getServerInstance().getPort())); pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret()); try { - handle.setChildProc(pb.start(), loggerName); + Process child = pb.start(); + InputStream logStream = null; + if (loggerName != null) { + logStream = outputToLog ? child.getInputStream() : child.getErrorStream(); + } + handle.setChildProc(child, loggerName, logStream); } catch (IOException ioe) { handle.kill(); throw ioe; @@ -538,10 +558,9 @@ public class SparkLauncher { return handle; } - private ProcessBuilder createBuilder() { + private ProcessBuilder createBuilder() throws IOException { List<String> cmd = new ArrayList<>(); - String script = isWindows() ? "spark-submit.cmd" : "spark-submit"; - cmd.add(join(File.separator, builder.getSparkHome(), "bin", script)); + cmd.add(findSparkSubmit()); cmd.addAll(builder.buildSparkSubmitArgs()); // Since the child process is a batch script, let's quote things so that special characters are @@ -568,11 +587,11 @@ public class SparkLauncher { // Similarly, if redirectToLog is specified, no other redirections should be specified. checkState(!redirectErrorStream || errorStream == null, "Cannot specify both redirectError() and redirectError(...) "); - checkState(!redirectToLog || - (!redirectErrorStream && errorStream == null && outputStream == null), + checkState(getLoggerName() == null || + ((!redirectErrorStream && errorStream == null) || outputStream == null), "Cannot used redirectToLog() in conjunction with other redirection methods."); - if (redirectErrorStream || redirectToLog) { + if (redirectErrorStream) { pb.redirectErrorStream(true); } if (errorStream != null) { @@ -585,6 +604,16 @@ public class SparkLauncher { return pb; } + // Visible for testing. + String findSparkSubmit() throws IOException { + String script = isWindows() ? "spark-submit.cmd" : "spark-submit"; + return join(File.separator, builder.getSparkHome(), "bin", script); + } + + private String getLoggerName() throws IOException { + return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME); + } + private static class ArgumentValidator extends SparkSubmitOptionParser { private final boolean hasValue; diff --git a/launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java b/launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java new file mode 100644 index 0000000000000000000000000000000000000000..ba044d3e4c0d7047ef057aa82292302254e89f7d --- /dev/null +++ b/launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.launcher; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.stream.Collectors; +import static java.nio.file.attribute.PosixFilePermission.*; + +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.Assert.*; +import static org.junit.Assume.*; + +import static org.apache.spark.launcher.CommandBuilderUtils.*; + +public class OutputRedirectionSuite extends BaseSuite { + + private static final List<String> MESSAGES = new ArrayList<>(); + + private static final List<String> TEST_SCRIPT = Arrays.asList( + "#!/bin/sh", + "echo \"output\"", + "echo \"error\" 1>&2"); + + private static File TEST_SCRIPT_PATH; + + @AfterClass + public static void cleanupClass() throws Exception { + if (TEST_SCRIPT_PATH != null) { + TEST_SCRIPT_PATH.delete(); + TEST_SCRIPT_PATH = null; + } + } + + @BeforeClass + public static void setupClass() throws Exception { + TEST_SCRIPT_PATH = File.createTempFile("output-redir-test", ".sh"); + Files.setPosixFilePermissions(TEST_SCRIPT_PATH.toPath(), + EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE)); + Files.write(TEST_SCRIPT_PATH.toPath(), TEST_SCRIPT); + } + + @Before + public void cleanupLog() { + MESSAGES.clear(); + } + + @Test + public void testRedirectsSimple() throws Exception { + SparkLauncher launcher = new SparkLauncher(); + launcher.redirectError(ProcessBuilder.Redirect.PIPE); + assertNotNull(launcher.errorStream); + assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.PIPE); + + launcher.redirectOutput(ProcessBuilder.Redirect.PIPE); + assertNotNull(launcher.outputStream); + assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.PIPE); + } + + @Test + public void testRedirectLastWins() throws Exception { + SparkLauncher launcher = new SparkLauncher(); + launcher.redirectError(ProcessBuilder.Redirect.PIPE) + .redirectError(ProcessBuilder.Redirect.INHERIT); + assertEquals(launcher.errorStream.type(), ProcessBuilder.Redirect.Type.INHERIT); + + launcher.redirectOutput(ProcessBuilder.Redirect.PIPE) + .redirectOutput(ProcessBuilder.Redirect.INHERIT); + assertEquals(launcher.outputStream.type(), ProcessBuilder.Redirect.Type.INHERIT); + } + + @Test + public void testRedirectToLog() throws Exception { + assumeFalse(isWindows()); + + ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher().startApplication(); + waitFor(handle); + + assertTrue(MESSAGES.contains("output")); + assertTrue(MESSAGES.contains("error")); + } + + @Test + public void testRedirectErrorToLog() throws Exception { + assumeFalse(isWindows()); + + Path err = Files.createTempFile("stderr", "txt"); + + ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher() + .redirectError(err.toFile()) + .startApplication(); + waitFor(handle); + + assertTrue(MESSAGES.contains("output")); + assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList())); + } + + @Test + public void testRedirectOutputToLog() throws Exception { + assumeFalse(isWindows()); + + Path out = Files.createTempFile("stdout", "txt"); + + ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher() + .redirectOutput(out.toFile()) + .startApplication(); + waitFor(handle); + + assertTrue(MESSAGES.contains("error")); + assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList())); + } + + @Test + public void testNoRedirectToLog() throws Exception { + assumeFalse(isWindows()); + + Path out = Files.createTempFile("stdout", "txt"); + Path err = Files.createTempFile("stderr", "txt"); + + ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher() + .redirectError(err.toFile()) + .redirectOutput(out.toFile()) + .startApplication(); + waitFor(handle); + + assertTrue(MESSAGES.isEmpty()); + assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList())); + assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList())); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadLogRedirect() throws Exception { + new SparkLauncher() + .redirectError() + .redirectOutput(Files.createTempFile("stdout", "txt").toFile()) + .redirectToLog("foo") + .launch() + .waitFor(); + } + + @Test(expected = IllegalArgumentException.class) + public void testRedirectErrorTwiceFails() throws Exception { + new SparkLauncher() + .redirectError() + .redirectError(Files.createTempFile("stderr", "txt").toFile()) + .launch() + .waitFor(); + } + + private void waitFor(ChildProcAppHandle handle) throws Exception { + try { + while (handle.isRunning()) { + Thread.sleep(10); + } + } finally { + // Explicit unregister from server since the handle doesn't yet do that when the + // process finishes by itself. + LauncherServer server = LauncherServer.getServerInstance(); + if (server != null) { + server.unregister(handle); + } + } + } + + private static class TestSparkLauncher extends SparkLauncher { + + TestSparkLauncher() { + setAppResource("outputredirtest"); + } + + @Override + String findSparkSubmit() { + return TEST_SCRIPT_PATH.getAbsolutePath(); + } + + } + + /** + * A log4j appender used by child apps of this test. It records all messages logged through it in + * memory so the test can check them. + */ + public static class LogAppender extends AppenderSkeleton { + + @Override + protected void append(LoggingEvent event) { + MESSAGES.add(event.getMessage().toString()); + } + + @Override + public boolean requiresLayout() { + return false; + } + + @Override + public void close() { + + } + + } +} diff --git a/launcher/src/test/resources/log4j.properties b/launcher/src/test/resources/log4j.properties index 744c456cb29c1178400f60151e5158c93f49e896..bd982b1d44aa8d63965a92c1cdc2c1bb38feebd2 100644 --- a/launcher/src/test/resources/log4j.properties +++ b/launcher/src/test/resources/log4j.properties @@ -29,5 +29,9 @@ log4j.appender.childproc.target=System.err log4j.appender.childproc.layout=org.apache.log4j.PatternLayout log4j.appender.childproc.layout.ConversionPattern=%t: %m%n +log4j.appender.outputredirtest=org.apache.spark.launcher.OutputRedirectionSuite$LogAppender +log4j.logger.org.apache.spark.launcher.app.outputredirtest=INFO, outputredirtest +log4j.logger.org.apache.spark.launcher.app.outputredirtest.additivity=false + # Ignore messages below warning level from Jetty, because it's a bit verbose log4j.logger.org.spark_project.jetty=WARN