Skip to content
Snippets Groups Projects
Commit 767d288b authored by Marcelo Vanzin's avatar Marcelo Vanzin
Browse files

[SPARK-11655][CORE] Fix deadlock in handling of launcher stop().

The stop() callback was trying to close the launcher connection in the
same thread that handles connection data, which ended up causing a
deadlock. So avoid that by dispatching the stop() request in its own
thread.

On top of that, add some exception safety to a few parts of the code,
and use "destroyForcibly" from Java 8 if it's available, to force
kill the child process. The flip side is that "kill()" may not actually
work if running Java 7.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #9633 from vanzin/SPARK-11655.
parent d292f748
No related branches found
No related tags found
No related merge requests found
...@@ -21,7 +21,7 @@ import java.net.{InetAddress, Socket} ...@@ -21,7 +21,7 @@ import java.net.{InetAddress, Socket}
import org.apache.spark.SPARK_VERSION import org.apache.spark.SPARK_VERSION
import org.apache.spark.launcher.LauncherProtocol._ import org.apache.spark.launcher.LauncherProtocol._
import org.apache.spark.util.ThreadUtils import org.apache.spark.util.{ThreadUtils, Utils}
/** /**
* A class that can be used to talk to a launcher server. Users should extend this class to * A class that can be used to talk to a launcher server. Users should extend this class to
...@@ -88,12 +88,20 @@ private[spark] abstract class LauncherBackend { ...@@ -88,12 +88,20 @@ private[spark] abstract class LauncherBackend {
*/ */
protected def onDisconnected() : Unit = { } protected def onDisconnected() : Unit = { }
private def fireStopRequest(): Unit = {
val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
override def run(): Unit = Utils.tryLogNonFatalError {
onStopRequest()
}
})
thread.start()
}
private class BackendConnection(s: Socket) extends LauncherConnection(s) { private class BackendConnection(s: Socket) extends LauncherConnection(s) {
override protected def handle(m: Message): Unit = m match { override protected def handle(m: Message): Unit = m match {
case _: Stop => case _: Stop =>
onStopRequest() fireStopRequest()
case _ => case _ =>
throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}") throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")
......
...@@ -191,17 +191,19 @@ private[spark] class SparkDeploySchedulerBackend( ...@@ -191,17 +191,19 @@ private[spark] class SparkDeploySchedulerBackend(
} }
private def stop(finalState: SparkAppHandle.State): Unit = synchronized { private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
stopping = true try {
stopping = true
launcherBackend.setState(finalState) super.stop()
launcherBackend.close() client.stop()
super.stop() val callback = shutdownCallback
client.stop() if (callback != null) {
callback(this)
val callback = shutdownCallback }
if (callback != null) { } finally {
callback(this) launcherBackend.setState(finalState)
launcherBackend.close()
} }
} }
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
package org.apache.spark.launcher; package org.apache.spark.launcher;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
...@@ -102,8 +103,20 @@ class ChildProcAppHandle implements SparkAppHandle { ...@@ -102,8 +103,20 @@ class ChildProcAppHandle implements SparkAppHandle {
disconnect(); disconnect();
} }
if (childProc != null) { if (childProc != null) {
childProc.destroy(); try {
childProc = null; childProc.exitValue();
} catch (IllegalThreadStateException e) {
// Child is still alive. Try to use Java 8's "destroyForcibly()" if available,
// fall back to the old API if it's not there.
try {
Method destroy = childProc.getClass().getMethod("destroyForcibly");
destroy.invoke(childProc);
} catch (Exception inner) {
childProc.destroy();
}
} finally {
childProc = null;
}
} }
} }
......
...@@ -89,6 +89,9 @@ public interface SparkAppHandle { ...@@ -89,6 +89,9 @@ public interface SparkAppHandle {
* Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send * Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send
* a {@link #stop()} message to the application, so it's recommended that users first try to * a {@link #stop()} message to the application, so it's recommended that users first try to
* stop the application cleanly and only resort to this method if that fails. * stop the application cleanly and only resort to this method if that fails.
* <p>
* Note that if the application is running as a child process, this method fail to kill the
* process when using Java 7. This may happen if, for example, the application is deadlocked.
*/ */
void kill(); void kill();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment