From 2075bf8ef6035fd7606bcf20dc2cd7d7b9cda446 Mon Sep 17 00:00:00 2001
From: Sean Owen <sowen@cloudera.com>
Date: Fri, 1 Jul 2016 09:22:27 +0100
Subject: [PATCH] [SPARK-16182][CORE] Utils.scala -- terminateProcess() should
 call Process.destroyForcibly() if and only if Process.destroy() fails

## What changes were proposed in this pull request?

Utils.terminateProcess should `destroy()` first and only fall back to `destroyForcibly()` if it fails. It's kind of bad that we're force-killing executors -- and only in Java 8. See JIRA for an example of the impact: no shutdown

While here: `Utils.waitForProcess` should use the Java 8 method if available instead of a custom implementation.

## How was this patch tested?

Existing tests, which cover the force-kill case, and Amplab tests, which will cover both Java 7 and Java 8 eventually. However I tested locally on Java 8 and the PR builder will try Java 7 here.

Author: Sean Owen <sowen@cloudera.com>

Closes #13973 from srowen/SPARK-16182.
---
 .../scala/org/apache/spark/util/Utils.scala   | 76 +++++++++++--------
 .../org/apache/spark/util/UtilsSuite.scala    |  2 +-
 2 files changed, 47 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f77cc2f9b7..0c23f3cd35 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1772,50 +1772,66 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Terminates a process waiting for at most the specified duration. Returns whether
-   * the process terminated.
+   * Terminates a process waiting for at most the specified duration.
+   *
+   * @return the process exit value if it was successfully terminated, else None
    */
   def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = {
-    try {
-      // Java8 added a new API which will more forcibly kill the process. Use that if available.
-      val destroyMethod = process.getClass().getMethod("destroyForcibly");
-      destroyMethod.setAccessible(true)
-      destroyMethod.invoke(process)
-    } catch {
-      case NonFatal(e) =>
-        if (!e.isInstanceOf[NoSuchMethodException]) {
-          logWarning("Exception when attempting to kill process", e)
-        }
-        process.destroy()
-    }
+    // Politely destroy first
+    process.destroy()
+
     if (waitForProcess(process, timeoutMs)) {
+      // Successful exit
       Option(process.exitValue())
     } else {
-      None
+      // Java 8 added a new API which will more forcibly kill the process. Use that if available.
+      try {
+        classOf[Process].getMethod("destroyForcibly").invoke(process)
+      } catch {
+        case _: NoSuchMethodException => return None // Not available; give up
+        case NonFatal(e) => logWarning("Exception when attempting to kill process", e)
+      }
+      // Wait, again, although this really should return almost immediately
+      if (waitForProcess(process, timeoutMs)) {
+        Option(process.exitValue())
+      } else {
+        logWarning("Timed out waiting to forcibly kill process")
+        None
+      }
     }
   }
 
   /**
    * Wait for a process to terminate for at most the specified duration.
-   * Return whether the process actually terminated after the given timeout.
+   *
+   * @return whether the process actually terminated before the given timeout.
    */
   def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
-    var terminated = false
-    val startTime = System.currentTimeMillis
-    while (!terminated) {
-      try {
-        process.exitValue()
-        terminated = true
-      } catch {
-        case e: IllegalThreadStateException =>
-          // Process not terminated yet
-          if (System.currentTimeMillis - startTime > timeoutMs) {
-            return false
+    try {
+      // Use Java 8 method if available
+      classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit])
+        .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS)
+        .asInstanceOf[Boolean]
+    } catch {
+      case _: NoSuchMethodError =>
+        // Otherwise implement it manually
+        var terminated = false
+        val startTime = System.currentTimeMillis
+        while (!terminated) {
+          try {
+            process.exitValue()
+            terminated = true
+          } catch {
+            case e: IllegalThreadStateException =>
+              // Process not terminated yet
+              if (System.currentTimeMillis - startTime > timeoutMs) {
+                return false
+              }
+              Thread.sleep(100)
           }
-          Thread.sleep(100)
-      }
+        }
+        true
     }
-    true
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index df279b5a37..f5d0fb00b7 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -863,7 +863,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
           assert(terminated.isDefined)
           Utils.waitForProcess(process, 5000)
           val duration = System.currentTimeMillis() - start
-          assert(duration < 5000)
+          assert(duration < 6000) // add a little extra time to allow a force kill to finish
           assert(!pidExists(pid))
         } finally {
           signal(pid, "SIGKILL")
-- 
GitLab