diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 975c75473214adc7c7b96ce946fd6f3cc2e6912c..8be56c99152654b72202af8b153e2dde25946631 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -218,7 +218,7 @@ class StreamingContext(object): @param timeout: time to wait in seconds """ - self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) + return self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) def stop(self, stopSparkContext=True, stopGraceFully=False): """ diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index f7fa481d50235b6b7c21246d7eaf8f9e9c4fadf4..179479625bca4670e98ef00a7b28c22a1dfc5f90 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -596,6 +596,13 @@ class StreamingContextTests(PySparkStreamingTestCase): self.ssc = StreamingContext.getActiveOrCreate(None, setupFunc) self.assertTrue(self.setupCalled) + def test_await_termination_or_timeout(self): + self._add_input_stream() + self.ssc.start() + self.assertFalse(self.ssc.awaitTerminationOrTimeout(0.001)) + self.ssc.stop(False) + self.assertTrue(self.ssc.awaitTerminationOrTimeout(0.001)) + class CheckpointTests(unittest.TestCase):