From a355b667a3718d9c5d48a0781e836bf5418ab842 Mon Sep 17 00:00:00 2001
From: Felix Cheung <felixcheung_m@hotmail.com>
Date: Sun, 30 Apr 2017 23:23:49 -0700
Subject: [PATCH] [SPARK-20541][SPARKR][SS] support awaitTermination without
 timeout

## What changes were proposed in this pull request?

Add without param for timeout - will need this to submit a job that runs until stopped
Need this for 2.2

## How was this patch tested?

manually, unit test

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17815 from felixcheung/rssawaitinfinite.
---
 R/pkg/R/generics.R                         |  2 +-
 R/pkg/R/streaming.R                        | 14 ++++++++++----
 R/pkg/inst/tests/testthat/test_streaming.R |  1 +
 3 files changed, 12 insertions(+), 5 deletions(-)

diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index d4e4958dc0..ef36765a7a 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1518,7 +1518,7 @@ setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml")
 
 #' @rdname awaitTermination
 #' @export
-setGeneric("awaitTermination", function(x, timeout) { standardGeneric("awaitTermination") })
+setGeneric("awaitTermination", function(x, timeout = NULL) { standardGeneric("awaitTermination") })
 
 #' @rdname isActive
 #' @export
diff --git a/R/pkg/R/streaming.R b/R/pkg/R/streaming.R
index e353d2dd07..8390bd5e6d 100644
--- a/R/pkg/R/streaming.R
+++ b/R/pkg/R/streaming.R
@@ -169,8 +169,10 @@ setMethod("isActive",
 #' immediately.
 #'
 #' @param x a StreamingQuery.
-#' @param timeout time to wait in milliseconds
-#' @return TRUE if query has terminated within the timeout period.
+#' @param timeout time to wait in milliseconds, if omitted, wait indefinitely until \code{stopQuery}
+#'                is called or an error has occured.
+#' @return TRUE if query has terminated within the timeout period; nothing if timeout is not
+#'         specified.
 #' @rdname awaitTermination
 #' @name awaitTermination
 #' @aliases awaitTermination,StreamingQuery-method
@@ -182,8 +184,12 @@ setMethod("isActive",
 #' @note experimental
 setMethod("awaitTermination",
           signature(x = "StreamingQuery"),
-          function(x, timeout) {
-            handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
+          function(x, timeout = NULL) {
+            if (is.null(timeout)) {
+              invisible(handledCallJMethod(x@ssq, "awaitTermination"))
+            } else {
+              handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
+            }
           })
 
 #' stopQuery
diff --git a/R/pkg/inst/tests/testthat/test_streaming.R b/R/pkg/inst/tests/testthat/test_streaming.R
index 1f4054a84d..b125cb0591 100644
--- a/R/pkg/inst/tests/testthat/test_streaming.R
+++ b/R/pkg/inst/tests/testthat/test_streaming.R
@@ -61,6 +61,7 @@ test_that("read.stream, write.stream, awaitTermination, stopQuery", {
 
   stopQuery(q)
   expect_true(awaitTermination(q, 1))
+  expect_error(awaitTermination(q), NA)
 })
 
 test_that("print from explain, lastProgress, status, isActive", {
-- 
GitLab