From a5719804c5ed99ce36bd0dd230ab8b3b7a3b92e3 Mon Sep 17 00:00:00 2001
From: Marcelo Vanzin <vanzin@cloudera.com>
Date: Thu, 15 Oct 2015 14:46:40 -0700
Subject: [PATCH] [SPARK-11071] [LAUNCHER] Fix flakiness in
 LauncherServerSuite::timeout.

The test could fail depending on scheduling of the various threads
involved; the change removes some sources of races, while making the
test a little more resilient by trying a few times before giving up.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #9079 from vanzin/SPARK-11071.
---
 .../apache/spark/launcher/LauncherServer.java |  9 ++++-
 .../spark/launcher/LauncherServerSuite.java   | 35 ++++++++++++++-----
 2 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
index c5fd40816d..d099ee9aa9 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java
@@ -242,7 +242,14 @@ class LauncherServer implements Closeable {
           synchronized (clients) {
             clients.add(clientConnection);
           }
-          timeoutTimer.schedule(timeout, getConnectionTimeout());
+          long timeoutMs = getConnectionTimeout();
+          // 0 is used for testing to avoid issues with clock resolution / thread scheduling,
+          // and force an immediate timeout.
+          if (timeoutMs > 0) {
+            timeoutTimer.schedule(timeout, getConnectionTimeout());
+          } else {
+            timeout.run();
+          }
         }
       }
     } catch (IOException ioe) {
diff --git a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
index 27cd1061a1..dc8fbb58d8 100644
--- a/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
+++ b/launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java
@@ -121,12 +121,12 @@ public class LauncherServerSuite extends BaseSuite {
 
   @Test
   public void testTimeout() throws Exception {
-    final long TEST_TIMEOUT = 10L;
-
     ChildProcAppHandle handle = null;
     TestClient client = null;
     try {
-      SparkLauncher.setConfig(SparkLauncher.CHILD_CONNECTION_TIMEOUT, String.valueOf(TEST_TIMEOUT));
+      // LauncherServer will immediately close the server-side socket when the timeout is set
+      // to 0.
+      SparkLauncher.setConfig(SparkLauncher.CHILD_CONNECTION_TIMEOUT, "0");
 
       handle = LauncherServer.newAppHandle();
 
@@ -134,12 +134,29 @@ public class LauncherServerSuite extends BaseSuite {
         LauncherServer.getServerInstance().getPort());
       client = new TestClient(s);
 
-      Thread.sleep(TEST_TIMEOUT * 10);
-      try {
-        client.send(new Hello(handle.getSecret(), "1.4.0"));
-        fail("Expected exception caused by connection timeout.");
-      } catch (IllegalStateException e) {
-        // Expected.
+      // Try a few times since the client-side socket may not reflect the server-side close
+      // immediately.
+      boolean helloSent = false;
+      int maxTries = 10;
+      for (int i = 0; i < maxTries; i++) {
+        try {
+          if (!helloSent) {
+            client.send(new Hello(handle.getSecret(), "1.4.0"));
+            helloSent = true;
+          } else {
+            client.send(new SetAppId("appId"));
+          }
+          fail("Expected exception caused by connection timeout.");
+        } catch (IllegalStateException | IOException e) {
+          // Expected.
+          break;
+        } catch (AssertionError e) {
+          if (i < maxTries - 1) {
+            Thread.sleep(100);
+          } else {
+            throw new AssertionError("Test failed after " + maxTries + " attempts.", e);
+          }
+        }
       }
     } finally {
       SparkLauncher.launcherConfig.remove(SparkLauncher.CHILD_CONNECTION_TIMEOUT);
-- 
GitLab