From 28dcbb531ae57dc50f15ad9df6c31022731669c9 Mon Sep 17 00:00:00 2001
From: Davies Liu <davies.liu@gmail.com>
Date: Sun, 10 Aug 2014 13:00:38 -0700
Subject: [PATCH] [SPARK-2898] [PySpark] fix bugs in deamon.py

1. do not use signal handler for SIGCHILD, it's easy to cause deadlock
2. handle EINTR during accept()
3. pass errno into JVM
4. handle EAGAIN during fork()

Now, it can pass 50k tasks tests in 180 seconds.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1842 from davies/qa and squashes the following commits:

f0ea451 [Davies Liu] fix lint
03a2e8c [Davies Liu] cleanup dead children every seconds
32cb829 [Davies Liu] fix lint
0cd0817 [Davies Liu] fix bugs in deamon.py
---
 .../api/python/PythonWorkerFactory.scala      |  2 +-
 python/pyspark/daemon.py                      | 78 +++++++++++--------
 2 files changed, 48 insertions(+), 32 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 7af260d0b7..bf716a8ab0 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -68,7 +68,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
       val socket = new Socket(daemonHost, daemonPort)
       val pid = new DataInputStream(socket.getInputStream).readInt()
       if (pid < 0) {
-        throw new IllegalStateException("Python daemon failed to launch worker")
+        throw new IllegalStateException("Python daemon failed to launch worker with code " + pid)
       }
       daemonWorkers.put(socket, pid)
       socket
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index e73538baf0..22ab8d30c0 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -22,7 +22,8 @@ import select
 import socket
 import sys
 import traceback
-from errno import EINTR, ECHILD
+import time
+from errno import EINTR, ECHILD, EAGAIN
 from socket import AF_INET, SOCK_STREAM, SOMAXCONN
 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
 from pyspark.worker import main as worker_main
@@ -80,6 +81,17 @@ def worker(sock):
         os._exit(compute_real_exit_code(exit_code))
 
 
+# Cleanup zombie children
+def cleanup_dead_children():
+    try:
+        while True:
+            pid, _ = os.waitpid(0, os.WNOHANG)
+            if not pid:
+                break
+    except:
+        pass
+
+
 def manager():
     # Create a new process group to corral our children
     os.setpgid(0, 0)
@@ -102,29 +114,21 @@ def manager():
     signal.signal(SIGTERM, handle_sigterm)  # Gracefully exit on SIGTERM
     signal.signal(SIGHUP, SIG_IGN)  # Don't die on SIGHUP
 
-    # Cleanup zombie children
-    def handle_sigchld(*args):
-        try:
-            pid, status = os.waitpid(0, os.WNOHANG)
-            if status != 0:
-                msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
-                print >> sys.stderr, msg
-        except EnvironmentError as err:
-            if err.errno not in (ECHILD, EINTR):
-                raise
-    signal.signal(SIGCHLD, handle_sigchld)
-
     # Initialization complete
     sys.stdout.close()
     try:
         while True:
             try:
-                ready_fds = select.select([0, listen_sock], [], [])[0]
+                ready_fds = select.select([0, listen_sock], [], [], 1)[0]
             except select.error as ex:
                 if ex[0] == EINTR:
                     continue
                 else:
                     raise
+
+            # cleanup in signal handler will cause deadlock
+            cleanup_dead_children()
+
             if 0 in ready_fds:
                 try:
                     worker_pid = read_int(sys.stdin)
@@ -137,29 +141,41 @@ def manager():
                     pass  # process already died
 
             if listen_sock in ready_fds:
-                sock, addr = listen_sock.accept()
+                try:
+                    sock, _ = listen_sock.accept()
+                except OSError as e:
+                    if e.errno == EINTR:
+                        continue
+                    raise
+
                 # Launch a worker process
                 try:
                     pid = os.fork()
-                    if pid == 0:
-                        listen_sock.close()
-                        try:
-                            worker(sock)
-                        except:
-                            traceback.print_exc()
-                            os._exit(1)
-                        else:
-                            os._exit(0)
+                except OSError as e:
+                    if e.errno in (EAGAIN, EINTR):
+                        time.sleep(1)
+                        pid = os.fork()  # error here will shutdown daemon
                     else:
+                        outfile = sock.makefile('w')
+                        write_int(e.errno, outfile)  # Signal that the fork failed
+                        outfile.flush()
+                        outfile.close()
                         sock.close()
-
-                except OSError as e:
-                    print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
-                    outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
-                    write_int(-1, outfile)  # Signal that the fork failed
-                    outfile.flush()
-                    outfile.close()
+                        continue
+
+                if pid == 0:
+                    # in child process
+                    listen_sock.close()
+                    try:
+                        worker(sock)
+                    except:
+                        traceback.print_exc()
+                        os._exit(1)
+                    else:
+                        os._exit(0)
+                else:
                     sock.close()
+
     finally:
         shutdown(1)
 
-- 
GitLab