Skip to content
Snippets Groups Projects
Commit c409e23a authored by Subroto Sanyal's avatar Subroto Sanyal Committed by Marcelo Vanzin
Browse files

[SPARK-15652][LAUNCHER] Added a new State (LOST) for the listeners of SparkLauncher

## What changes were proposed in this pull request?
This situation can happen when the LauncherConnection gets an exception while reading through the socket and terminating silently without notifying making the client/listener think that the job is still in previous state.
The fix force sends a notification to client that the job finished with unknown status and let client handle it accordingly.

## How was this patch tested?
Added a unit test.

Author: Subroto Sanyal <ssanyal@datameer.com>

Closes #13497 from subrotosanyal/SPARK-15652-handle-spark-submit-jvm-crash.
parent 36d3dfa5
No related branches found
No related tags found
No related merge requests found
......@@ -337,6 +337,10 @@ class LauncherServer implements Closeable {
}
super.close();
if (handle != null) {
if (!handle.getState().isFinal()) {
LOG.log(Level.WARNING, "Lost connection to spark application.");
handle.setState(SparkAppHandle.State.LOST);
}
handle.disconnect();
}
}
......
......@@ -46,7 +46,9 @@ public interface SparkAppHandle {
/** The application finished with a failed status. */
FAILED(true),
/** The application was killed. */
KILLED(true);
KILLED(true),
/** The Spark Submit JVM exited with a unknown status. */
LOST(true);
private final boolean isFinal;
......
......@@ -152,6 +152,37 @@ public class LauncherServerSuite extends BaseSuite {
}
}
@Test
public void testSparkSubmitVmShutsDown() throws Exception {
ChildProcAppHandle handle = LauncherServer.newAppHandle();
TestClient client = null;
final Semaphore semaphore = new Semaphore(0);
try {
Socket s = new Socket(InetAddress.getLoopbackAddress(),
LauncherServer.getServerInstance().getPort());
handle.addListener(new SparkAppHandle.Listener() {
public void stateChanged(SparkAppHandle handle) {
semaphore.release();
}
public void infoChanged(SparkAppHandle handle) {
semaphore.release();
}
});
client = new TestClient(s);
client.send(new Hello(handle.getSecret(), "1.4.0"));
assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
// Make sure the server matched the client to the handle.
assertNotNull(handle.getConnection());
close(client);
assertTrue(semaphore.tryAcquire(30, TimeUnit.SECONDS));
assertEquals(SparkAppHandle.State.LOST, handle.getState());
} finally {
kill(handle);
close(client);
client.clientThread.join();
}
}
private void kill(SparkAppHandle handle) {
if (handle != null) {
handle.kill();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment