From 5fb70831bd3acf7e1d9933986ccce12c3872432b Mon Sep 17 00:00:00 2001
From: Shixiong Zhu <shixiong@databricks.com>
Date: Fri, 17 Mar 2017 11:12:23 -0700
Subject: [PATCH] [SPARK-19986][TESTS] Make
 pyspark.streaming.tests.CheckpointTests more stable

## What changes were proposed in this pull request?

Sometimes, CheckpointTests will hang on a busy machine because the streaming jobs are too slow and cannot catch up. I observed the scheduled delay was keeping increasing for dozens of seconds locally.

This PR increases the batch interval from 0.5 seconds to 2 seconds to generate less Spark jobs. It should make `pyspark.streaming.tests.CheckpointTests` more stable. I also replaced `sleep` with `awaitTerminationOrTimeout` so that if the streaming job fails, it will also fail the test.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #17323 from zsxwing/SPARK-19986.

(cherry picked from commit 376d782164437573880f0ad58cecae1cb5f212f2)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
---
 python/pyspark/streaming/tests.py | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 5ac007cd59..080aa3b55d 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -903,11 +903,11 @@ class CheckpointTests(unittest.TestCase):
         def setup():
             conf = SparkConf().set("spark.default.parallelism", 1)
             sc = SparkContext(conf=conf)
-            ssc = StreamingContext(sc, 0.5)
+            ssc = StreamingContext(sc, 2)
             dstream = ssc.textFileStream(inputd).map(lambda x: (x, 1))
             wc = dstream.updateStateByKey(updater)
             wc.map(lambda x: "%s,%d" % x).saveAsTextFiles(outputd + "test")
-            wc.checkpoint(.5)
+            wc.checkpoint(2)
             self.setupCalled = True
             return ssc
 
@@ -921,21 +921,22 @@ class CheckpointTests(unittest.TestCase):
 
         def check_output(n):
             while not os.listdir(outputd):
-                time.sleep(0.01)
+                if self.ssc.awaitTerminationOrTimeout(0.5):
+                    raise Exception("ssc stopped")
             time.sleep(1)  # make sure mtime is larger than the previous one
             with open(os.path.join(inputd, str(n)), 'w') as f:
                 f.writelines(["%d\n" % i for i in range(10)])
 
             while True:
+                if self.ssc.awaitTerminationOrTimeout(0.5):
+                    raise Exception("ssc stopped")
                 p = os.path.join(outputd, max(os.listdir(outputd)))
                 if '_SUCCESS' not in os.listdir(p):
                     # not finished
-                    time.sleep(0.01)
                     continue
                 ordd = self.ssc.sparkContext.textFile(p).map(lambda line: line.split(","))
                 d = ordd.values().map(int).collect()
                 if not d:
-                    time.sleep(0.01)
                     continue
                 self.assertEqual(10, len(d))
                 s = set(d)
-- 
GitLab