From fc92d25f2a27e81ef2d5031dcf856af1cc1d8c31 Mon Sep 17 00:00:00 2001 From: Felix Cheung <felixcheung@apache.org> Date: Wed, 28 Jun 2017 20:06:29 -0700 Subject: [PATCH] Revert "[SPARK-21094][R] Terminate R's worker processes in the parent of R's daemon to prevent a leak" This reverts commit 6b3d02285ee0debc73cbcab01b10398a498fbeb8. --- R/pkg/inst/worker/daemon.R | 59 +++----------------------------------- 1 file changed, 4 insertions(+), 55 deletions(-) diff --git a/R/pkg/inst/worker/daemon.R b/R/pkg/inst/worker/daemon.R index 6e385b2a27..3a318b71ea 100644 --- a/R/pkg/inst/worker/daemon.R +++ b/R/pkg/inst/worker/daemon.R @@ -30,55 +30,8 @@ port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT")) inputCon <- socketConnection( port = port, open = "rb", blocking = TRUE, timeout = connectionTimeout) -# Waits indefinitely for a socket connecion by default. -selectTimeout <- NULL - -# Exit code that children send to the parent to indicate they exited. -exitCode <- 1 - while (TRUE) { - ready <- socketSelect(list(inputCon), timeout = selectTimeout) - - # Note that the children should be terminated in the parent. If each child terminates - # itself, it appears that the resource is not released properly, that causes an unexpected - # termination of this daemon due to, for example, running out of file descriptors - # (see SPARK-21093). Therefore, the current implementation tries to retrieve children - # that are exited (but not terminated) and then sends a kill signal to terminate them properly - # in the parent. - # - # There are two paths that it attempts to send a signal to terminate the children in the parent. - # - # 1. Every second if any socket connection is not available and if there are child workers - # running. - # 2. Right after a socket connection is available. - # - # In other words, the parent attempts to send the signal to the children every second if - # any worker is running or right before launching other worker children from the following - # new socket connection. - - # Only the process IDs of children sent data to the parent are returned below. The children - # send a custom exit code to the parent after being exited and the parent tries - # to terminate them only if they sent the exit code. - children <- parallel:::selectChildren(timeout = 0) - - if (is.integer(children)) { - lapply(children, function(child) { - # This data should be raw bytes if any data was sent from this child. - # Otherwise, this returns the PID. - data <- parallel:::readChild(child) - if (is.raw(data)) { - # This checks if the data from this child is the exit code that indicates an exited child. - if (unserialize(data) == exitCode) { - # If so, we terminate this child. - tools::pskill(child, tools::SIGUSR1) - } - } - }) - } else if (is.null(children)) { - # If it is NULL, there are no children. Waits indefinitely for a socket connecion. - selectTimeout <- NULL - } - + ready <- socketSelect(list(inputCon)) if (ready) { port <- SparkR:::readInt(inputCon) # There is a small chance that it could be interrupted by signal, retry one time @@ -91,16 +44,12 @@ while (TRUE) { } p <- parallel:::mcfork() if (inherits(p, "masterProcess")) { - # Reach here because this is a child process. close(inputCon) Sys.setenv(SPARKR_WORKER_PORT = port) try(source(script)) - # Note that this mcexit does not fully terminate this child. So, this writes back - # a custom exit code so that the parent can read and terminate this child. - parallel:::mcexit(0L, send = exitCode) - } else { - # Forking succeeded and we need to check if they finished their jobs every second. - selectTimeout <- 1 + # Set SIGUSR1 so that child can exit + tools::pskill(Sys.getpid(), tools::SIGUSR1) + parallel:::mcexit(0L) } } } -- GitLab