From 3d2b6f56e38ce867ae8819752fd693adab9a8cc9 Mon Sep 17 00:00:00 2001
From: Tathagata Das <tathagata.das1565@gmail.com>
Date: Thu, 10 Mar 2016 14:38:19 -0800
Subject: [PATCH] [SQL][TEST] Increased timeouts to reduce flakiness in
 ContinuousQueryManagerSuite

## What changes were proposed in this pull request?

ContinuousQueryManager is sometimes flaky on Jenkins. I could not reproduce it on my machine, so I guess it about the waiting times which causes problems if Jenkins is loaded. I have increased the wait time in the hope that it will be less flaky.

## How was this patch tested?

I reran the unit test many times on a loop in my machine. I am going to run it a few time in Jenkins, that's the real test.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #11638 from tdas/cqm-flaky-test.
---
 .../ContinuousQueryManagerSuite.scala         | 30 +++++++++----------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
index 35bb9fdbfd..45e824ad63 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
@@ -148,9 +148,9 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
       // awaitAnyTermination should be blocking or non-blocking depending on timeout values
       testAwaitAnyTermination(
         ExpectBlocked,
-        awaitTimeout = 2 seconds,
+        awaitTimeout = 4 seconds,
         expectedReturnedValue = false,
-        testBehaviorFor = 1 second)
+        testBehaviorFor = 2 seconds)
 
       testAwaitAnyTermination(
         ExpectNotBlocked,
@@ -162,20 +162,20 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
       val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false)
       testAwaitAnyTermination(
         ExpectNotBlocked,
-        awaitTimeout = 1 second,
+        awaitTimeout = 2 seconds,
         expectedReturnedValue = true,
-        testBehaviorFor = 2 seconds)
+        testBehaviorFor = 4 seconds)
       require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned
 
       // All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high
       testAwaitAnyTermination(
-        ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
+        ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true)
 
       // Resetting termination should make awaitAnyTermination() blocking again
       sqlContext.streams.resetTerminated()
       testAwaitAnyTermination(
         ExpectBlocked,
-        awaitTimeout = 2 seconds,
+        awaitTimeout = 4 seconds,
         expectedReturnedValue = false,
         testBehaviorFor = 1 second)
 
@@ -184,31 +184,31 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
       val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
       testAwaitAnyTermination(
         ExpectException[SparkException],
-        awaitTimeout = 1 second,
+        awaitTimeout = 1 seconds,
         testBehaviorFor = 2 seconds)
       require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned
 
       // All subsequent calls to awaitAnyTermination should throw the exception
       testAwaitAnyTermination(
         ExpectException[SparkException],
-        awaitTimeout = 1 second,
-        testBehaviorFor = 2 seconds)
+        awaitTimeout = 2 seconds,
+        testBehaviorFor = 4 seconds)
 
       // Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked
       sqlContext.streams.resetTerminated()
-      val q3 = stopRandomQueryAsync(1 second, withError = true)
+      val q3 = stopRandomQueryAsync(2 seconds, withError = true)
       testAwaitAnyTermination(
         ExpectNotBlocked,
         awaitTimeout = 100 milliseconds,
         expectedReturnedValue = false,
-        testBehaviorFor = 2 seconds)
+        testBehaviorFor = 4 seconds)
 
       // After that query is stopped, awaitAnyTerm should throw exception
       eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop
       testAwaitAnyTermination(
         ExpectException[SparkException],
         awaitTimeout = 100 milliseconds,
-        testBehaviorFor = 2 seconds)
+        testBehaviorFor = 4 seconds)
 
 
       // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws
@@ -217,12 +217,12 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
 
       val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
       testAwaitAnyTermination(
-        ExpectNotBlocked, awaitTimeout = 1 second, expectedReturnedValue = true)
+        ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true)
       require(!q4.isActive)
       val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
       eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
       // After q5 terminates with exception, awaitAnyTerm should start throwing exception
-      testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 100 milliseconds)
+      testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds)
     }
   }
 
@@ -260,7 +260,7 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
       expectedBehavior: ExpectedBehavior,
       expectedReturnedValue: Boolean = false,
       awaitTimeout: Span = null,
-      testBehaviorFor: Span = 2 seconds
+      testBehaviorFor: Span = 4 seconds
     ): Unit = {
 
     def awaitTermFunc(): Unit = {
-- 
GitLab