diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java index 19861b81c256292544307d645fa7182455123f07..db4fc26cdf353896914a72aa3711672f4cb96e9d 100644 --- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java +++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java @@ -116,11 +116,11 @@ public class SparkLauncherSuite { .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path")) .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow") .setMainClass(SparkLauncherTestApp.class.getName()) + .redirectError() .addAppArgs("proc"); final Process app = launcher.launch(); - new OutputRedirector(app.getInputStream(), TF); - new OutputRedirector(app.getErrorStream(), TF); + new OutputRedirector(app.getInputStream(), getClass().getName() + ".child", TF); assertEquals(0, app.waitFor()); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 3ce4b79ee9c0565fac9d6cf5b8028250029f8275..bf916406f1471ade138d03bc71c9af238e50038e 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -34,7 +34,7 @@ class ChildProcAppHandle implements SparkAppHandle { private final String secret; private final LauncherServer server; - private Process childProc; + private volatile Process childProc; private boolean disposed; private LauncherConnection connection; private List<Listener> listeners; @@ -96,18 +96,14 @@ class ChildProcAppHandle implements SparkAppHandle { @Override public synchronized void kill() { - if (!disposed) { - disconnect(); - } + disconnect(); if (childProc != null) { - try { - childProc.exitValue(); - } catch (IllegalThreadStateException e) { + if (childProc.isAlive()) { childProc.destroyForcibly(); - } finally { - childProc = null; } + childProc = null; } + setState(State.KILLED); } String getSecret() { @@ -118,7 +114,13 @@ class ChildProcAppHandle implements SparkAppHandle { this.childProc = childProc; if (logStream != null) { this.redirector = new OutputRedirector(logStream, loggerName, - SparkLauncher.REDIRECTOR_FACTORY); + SparkLauncher.REDIRECTOR_FACTORY, this); + } else { + // If there is no log redirection, spawn a thread that will wait for the child process + // to finish. + Thread waiter = SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild); + waiter.setDaemon(true); + waiter.start(); } } @@ -134,7 +136,7 @@ class ChildProcAppHandle implements SparkAppHandle { return connection; } - void setState(State s) { + synchronized void setState(State s) { if (!state.isFinal()) { state = s; fireEvent(false); @@ -144,17 +146,48 @@ class ChildProcAppHandle implements SparkAppHandle { } } - void setAppId(String appId) { + synchronized void setAppId(String appId) { this.appId = appId; fireEvent(true); } - // Visible for testing. - boolean isRunning() { - return childProc == null || childProc.isAlive() || (redirector != null && redirector.isAlive()); + /** + * Wait for the child process to exit and update the handle's state if necessary, accoding to + * the exit code. + */ + void monitorChild() { + while (childProc.isAlive()) { + try { + childProc.waitFor(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Exception waiting for child process to exit.", e); + } + } + + synchronized (this) { + if (disposed) { + return; + } + + disconnect(); + + int ec; + try { + ec = childProc.exitValue(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Exception getting child process exit code, assuming failure.", e); + ec = 1; + } + + // Only override the success state; leave other fail states alone. + if (!state.isFinal() || (ec != 0 && state == State.FINISHED)) { + state = State.LOST; + fireEvent(false); + } + } } - private synchronized void fireEvent(boolean isInfoChanged) { + private void fireEvent(boolean isInfoChanged) { if (listeners != null) { for (Listener l : listeners) { if (isInfoChanged) { diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index 63abae9a1c49d32541ddae783869cbb61e494b50..6f4b0bb38e031e42f3987777b4eff27f84d52f30 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -34,18 +34,24 @@ class OutputRedirector { private final BufferedReader reader; private final Logger sink; private final Thread thread; + private final ChildProcAppHandle callback; private volatile boolean active; - OutputRedirector(InputStream in, ThreadFactory tf) { - this(in, OutputRedirector.class.getName(), tf); + OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) { + this(in, loggerName, tf, null); } - OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) { + OutputRedirector( + InputStream in, + String loggerName, + ThreadFactory tf, + ChildProcAppHandle callback) { this.active = true; this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); this.thread = tf.newThread(this::redirect); this.sink = Logger.getLogger(loggerName); + this.callback = callback; thread.start(); } @@ -59,6 +65,10 @@ class OutputRedirector { } } catch (IOException e) { sink.log(Level.FINE, "Error reading child process output.", e); + } finally { + if (callback != null) { + callback.monitorChild(); + } } } diff --git a/launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java b/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java similarity index 81% rename from launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java rename to launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java index ba044d3e4c0d7047ef057aa82292302254e89f7d..64a87b365d6a94f5d34a6e887b6360139e2a295f 100644 --- a/launcher/src/test/java/org/apache/spark/launcher/OutputRedirectionSuite.java +++ b/launcher/src/test/java/org/apache/spark/launcher/ChildProcAppHandleSuite.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.nio.file.attribute.PosixFilePermission.*; @@ -39,7 +40,7 @@ import static org.junit.Assume.*; import static org.apache.spark.launcher.CommandBuilderUtils.*; -public class OutputRedirectionSuite extends BaseSuite { +public class ChildProcAppHandleSuite extends BaseSuite { private static final List<String> MESSAGES = new ArrayList<>(); @@ -99,7 +100,8 @@ public class OutputRedirectionSuite extends BaseSuite { public void testRedirectToLog() throws Exception { assumeFalse(isWindows()); - ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher().startApplication(); + SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher() + .startApplication(); waitFor(handle); assertTrue(MESSAGES.contains("output")); @@ -112,7 +114,7 @@ public class OutputRedirectionSuite extends BaseSuite { Path err = Files.createTempFile("stderr", "txt"); - ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher() + SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher() .redirectError(err.toFile()) .startApplication(); waitFor(handle); @@ -127,7 +129,7 @@ public class OutputRedirectionSuite extends BaseSuite { Path out = Files.createTempFile("stdout", "txt"); - ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher() + SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher() .redirectOutput(out.toFile()) .startApplication(); waitFor(handle); @@ -173,17 +175,37 @@ public class OutputRedirectionSuite extends BaseSuite { .waitFor(); } - private void waitFor(ChildProcAppHandle handle) throws Exception { + @Test + public void testProcMonitorWithOutputRedirection() throws Exception { + File err = Files.createTempFile("out", "txt").toFile(); + SparkAppHandle handle = new TestSparkLauncher() + .redirectError() + .redirectOutput(err) + .startApplication(); + waitFor(handle); + assertEquals(SparkAppHandle.State.LOST, handle.getState()); + } + + @Test + public void testProcMonitorWithLogRedirection() throws Exception { + SparkAppHandle handle = new TestSparkLauncher() + .redirectToLog(getClass().getName()) + .startApplication(); + waitFor(handle); + assertEquals(SparkAppHandle.State.LOST, handle.getState()); + } + + private void waitFor(SparkAppHandle handle) throws Exception { + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); try { - while (handle.isRunning()) { - Thread.sleep(10); + while (!handle.getState().isFinal()) { + assertTrue("Timed out waiting for handle to transition to final state.", + System.nanoTime() < deadline); + TimeUnit.MILLISECONDS.sleep(10); } } finally { - // Explicit unregister from server since the handle doesn't yet do that when the - // process finishes by itself. - LauncherServer server = LauncherServer.getServerInstance(); - if (server != null) { - server.unregister(handle); + if (!handle.getState().isFinal()) { + handle.kill(); } } } diff --git a/launcher/src/test/resources/log4j.properties b/launcher/src/test/resources/log4j.properties index bd982b1d44aa8d63965a92c1cdc2c1bb38feebd2..366a6b9f817c05d861dd1f56c39272c02d5acde5 100644 --- a/launcher/src/test/resources/log4j.properties +++ b/launcher/src/test/resources/log4j.properties @@ -29,7 +29,7 @@ log4j.appender.childproc.target=System.err log4j.appender.childproc.layout=org.apache.log4j.PatternLayout log4j.appender.childproc.layout.ConversionPattern=%t: %m%n -log4j.appender.outputredirtest=org.apache.spark.launcher.OutputRedirectionSuite$LogAppender +log4j.appender.outputredirtest=org.apache.spark.launcher.ChildProcAppHandleSuite$LogAppender log4j.logger.org.apache.spark.launcher.app.outputredirtest=INFO, outputredirtest log4j.logger.org.apache.spark.launcher.app.outputredirtest.additivity=false