Skip to content
Snippets Groups Projects
Commit cba826d0 authored by Marcelo Vanzin's avatar Marcelo Vanzin
Browse files

[SPARK-17742][CORE] Handle child process exit in SparkLauncher.

Currently the launcher handle does not monitor the child spark-submit
process it launches; this means that if the child exits with an error,
the handle's state will never change, and an application will not know
that the application has failed.

This change adds code to monitor the child process, and changes the
handle state appropriately when the child process exits.

Tested with added unit tests.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18877 from vanzin/SPARK-17742.
parent 14bdb25f
No related branches found
No related tags found
No related merge requests found
......@@ -116,11 +116,11 @@ public class SparkLauncherSuite {
.setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
.addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
.setMainClass(SparkLauncherTestApp.class.getName())
.redirectError()
.addAppArgs("proc");
final Process app = launcher.launch();
new OutputRedirector(app.getInputStream(), TF);
new OutputRedirector(app.getErrorStream(), TF);
new OutputRedirector(app.getInputStream(), getClass().getName() + ".child", TF);
assertEquals(0, app.waitFor());
}
......
......@@ -34,7 +34,7 @@ class ChildProcAppHandle implements SparkAppHandle {
private final String secret;
private final LauncherServer server;
private Process childProc;
private volatile Process childProc;
private boolean disposed;
private LauncherConnection connection;
private List<Listener> listeners;
......@@ -96,18 +96,14 @@ class ChildProcAppHandle implements SparkAppHandle {
@Override
public synchronized void kill() {
if (!disposed) {
disconnect();
}
disconnect();
if (childProc != null) {
try {
childProc.exitValue();
} catch (IllegalThreadStateException e) {
if (childProc.isAlive()) {
childProc.destroyForcibly();
} finally {
childProc = null;
}
childProc = null;
}
setState(State.KILLED);
}
String getSecret() {
......@@ -118,7 +114,13 @@ class ChildProcAppHandle implements SparkAppHandle {
this.childProc = childProc;
if (logStream != null) {
this.redirector = new OutputRedirector(logStream, loggerName,
SparkLauncher.REDIRECTOR_FACTORY);
SparkLauncher.REDIRECTOR_FACTORY, this);
} else {
// If there is no log redirection, spawn a thread that will wait for the child process
// to finish.
Thread waiter = SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild);
waiter.setDaemon(true);
waiter.start();
}
}
......@@ -134,7 +136,7 @@ class ChildProcAppHandle implements SparkAppHandle {
return connection;
}
void setState(State s) {
synchronized void setState(State s) {
if (!state.isFinal()) {
state = s;
fireEvent(false);
......@@ -144,17 +146,48 @@ class ChildProcAppHandle implements SparkAppHandle {
}
}
void setAppId(String appId) {
synchronized void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}
// Visible for testing.
boolean isRunning() {
return childProc == null || childProc.isAlive() || (redirector != null && redirector.isAlive());
/**
* Wait for the child process to exit and update the handle's state if necessary, accoding to
* the exit code.
*/
void monitorChild() {
while (childProc.isAlive()) {
try {
childProc.waitFor();
} catch (Exception e) {
LOG.log(Level.WARNING, "Exception waiting for child process to exit.", e);
}
}
synchronized (this) {
if (disposed) {
return;
}
disconnect();
int ec;
try {
ec = childProc.exitValue();
} catch (Exception e) {
LOG.log(Level.WARNING, "Exception getting child process exit code, assuming failure.", e);
ec = 1;
}
// Only override the success state; leave other fail states alone.
if (!state.isFinal() || (ec != 0 && state == State.FINISHED)) {
state = State.LOST;
fireEvent(false);
}
}
}
private synchronized void fireEvent(boolean isInfoChanged) {
private void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
......
......@@ -34,18 +34,24 @@ class OutputRedirector {
private final BufferedReader reader;
private final Logger sink;
private final Thread thread;
private final ChildProcAppHandle callback;
private volatile boolean active;
OutputRedirector(InputStream in, ThreadFactory tf) {
this(in, OutputRedirector.class.getName(), tf);
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
this(in, loggerName, tf, null);
}
OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
OutputRedirector(
InputStream in,
String loggerName,
ThreadFactory tf,
ChildProcAppHandle callback) {
this.active = true;
this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
this.thread = tf.newThread(this::redirect);
this.sink = Logger.getLogger(loggerName);
this.callback = callback;
thread.start();
}
......@@ -59,6 +65,10 @@ class OutputRedirector {
}
} catch (IOException e) {
sink.log(Level.FINE, "Error reading child process output.", e);
} finally {
if (callback != null) {
callback.monitorChild();
}
}
}
......
......@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static java.nio.file.attribute.PosixFilePermission.*;
......@@ -39,7 +40,7 @@ import static org.junit.Assume.*;
import static org.apache.spark.launcher.CommandBuilderUtils.*;
public class OutputRedirectionSuite extends BaseSuite {
public class ChildProcAppHandleSuite extends BaseSuite {
private static final List<String> MESSAGES = new ArrayList<>();
......@@ -99,7 +100,8 @@ public class OutputRedirectionSuite extends BaseSuite {
public void testRedirectToLog() throws Exception {
assumeFalse(isWindows());
ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher().startApplication();
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.startApplication();
waitFor(handle);
assertTrue(MESSAGES.contains("output"));
......@@ -112,7 +114,7 @@ public class OutputRedirectionSuite extends BaseSuite {
Path err = Files.createTempFile("stderr", "txt");
ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.redirectError(err.toFile())
.startApplication();
waitFor(handle);
......@@ -127,7 +129,7 @@ public class OutputRedirectionSuite extends BaseSuite {
Path out = Files.createTempFile("stdout", "txt");
ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
.redirectOutput(out.toFile())
.startApplication();
waitFor(handle);
......@@ -173,17 +175,37 @@ public class OutputRedirectionSuite extends BaseSuite {
.waitFor();
}
private void waitFor(ChildProcAppHandle handle) throws Exception {
@Test
public void testProcMonitorWithOutputRedirection() throws Exception {
File err = Files.createTempFile("out", "txt").toFile();
SparkAppHandle handle = new TestSparkLauncher()
.redirectError()
.redirectOutput(err)
.startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}
@Test
public void testProcMonitorWithLogRedirection() throws Exception {
SparkAppHandle handle = new TestSparkLauncher()
.redirectToLog(getClass().getName())
.startApplication();
waitFor(handle);
assertEquals(SparkAppHandle.State.LOST, handle.getState());
}
private void waitFor(SparkAppHandle handle) throws Exception {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
try {
while (handle.isRunning()) {
Thread.sleep(10);
while (!handle.getState().isFinal()) {
assertTrue("Timed out waiting for handle to transition to final state.",
System.nanoTime() < deadline);
TimeUnit.MILLISECONDS.sleep(10);
}
} finally {
// Explicit unregister from server since the handle doesn't yet do that when the
// process finishes by itself.
LauncherServer server = LauncherServer.getServerInstance();
if (server != null) {
server.unregister(handle);
if (!handle.getState().isFinal()) {
handle.kill();
}
}
}
......
......@@ -29,7 +29,7 @@ log4j.appender.childproc.target=System.err
log4j.appender.childproc.layout=org.apache.log4j.PatternLayout
log4j.appender.childproc.layout.ConversionPattern=%t: %m%n
log4j.appender.outputredirtest=org.apache.spark.launcher.OutputRedirectionSuite$LogAppender
log4j.appender.outputredirtest=org.apache.spark.launcher.ChildProcAppHandleSuite$LogAppender
log4j.logger.org.apache.spark.launcher.app.outputredirtest=INFO, outputredirtest
log4j.logger.org.apache.spark.launcher.app.outputredirtest.additivity=false
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment