diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 7af260d0b7f26b7b4bcd3691c8f78b83ca4c939d..bf716a8ab025b516c8daa58d873dda1680ed0a53 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -68,7 +68,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
       val socket = new Socket(daemonHost, daemonPort)
       val pid = new DataInputStream(socket.getInputStream).readInt()
       if (pid < 0) {
-        throw new IllegalStateException("Python daemon failed to launch worker")
+        throw new IllegalStateException("Python daemon failed to launch worker with code " + pid)
       }
       daemonWorkers.put(socket, pid)
       socket
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index e73538baf0b93b8bb06f1e9e592e9c1940b86ab7..22ab8d30c0ae30dbcaf38cca45ca1789a9928db0 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -22,7 +22,8 @@ import select
 import socket
 import sys
 import traceback
-from errno import EINTR, ECHILD
+import time
+from errno import EINTR, ECHILD, EAGAIN
 from socket import AF_INET, SOCK_STREAM, SOMAXCONN
 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
 from pyspark.worker import main as worker_main
@@ -80,6 +81,17 @@ def worker(sock):
         os._exit(compute_real_exit_code(exit_code))
 
 
+# Cleanup zombie children
+def cleanup_dead_children():
+    try:
+        while True:
+            pid, _ = os.waitpid(0, os.WNOHANG)
+            if not pid:
+                break
+    except:
+        pass
+
+
 def manager():
     # Create a new process group to corral our children
     os.setpgid(0, 0)
@@ -102,29 +114,21 @@ def manager():
     signal.signal(SIGTERM, handle_sigterm)  # Gracefully exit on SIGTERM
     signal.signal(SIGHUP, SIG_IGN)  # Don't die on SIGHUP
 
-    # Cleanup zombie children
-    def handle_sigchld(*args):
-        try:
-            pid, status = os.waitpid(0, os.WNOHANG)
-            if status != 0:
-                msg = "worker %s crashed abruptly with exit status %s" % (pid, status)
-                print >> sys.stderr, msg
-        except EnvironmentError as err:
-            if err.errno not in (ECHILD, EINTR):
-                raise
-    signal.signal(SIGCHLD, handle_sigchld)
-
     # Initialization complete
     sys.stdout.close()
     try:
         while True:
             try:
-                ready_fds = select.select([0, listen_sock], [], [])[0]
+                ready_fds = select.select([0, listen_sock], [], [], 1)[0]
             except select.error as ex:
                 if ex[0] == EINTR:
                     continue
                 else:
                     raise
+
+            # cleanup in signal handler will cause deadlock
+            cleanup_dead_children()
+
             if 0 in ready_fds:
                 try:
                     worker_pid = read_int(sys.stdin)
@@ -137,29 +141,41 @@ def manager():
                     pass  # process already died
 
             if listen_sock in ready_fds:
-                sock, addr = listen_sock.accept()
+                try:
+                    sock, _ = listen_sock.accept()
+                except OSError as e:
+                    if e.errno == EINTR:
+                        continue
+                    raise
+
                 # Launch a worker process
                 try:
                     pid = os.fork()
-                    if pid == 0:
-                        listen_sock.close()
-                        try:
-                            worker(sock)
-                        except:
-                            traceback.print_exc()
-                            os._exit(1)
-                        else:
-                            os._exit(0)
+                except OSError as e:
+                    if e.errno in (EAGAIN, EINTR):
+                        time.sleep(1)
+                        pid = os.fork()  # error here will shutdown daemon
                     else:
+                        outfile = sock.makefile('w')
+                        write_int(e.errno, outfile)  # Signal that the fork failed
+                        outfile.flush()
+                        outfile.close()
                         sock.close()
-
-                except OSError as e:
-                    print >> sys.stderr, "Daemon failed to fork PySpark worker: %s" % e
-                    outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
-                    write_int(-1, outfile)  # Signal that the fork failed
-                    outfile.flush()
-                    outfile.close()
+                        continue
+
+                if pid == 0:
+                    # in child process
+                    listen_sock.close()
+                    try:
+                        worker(sock)
+                    except:
+                        traceback.print_exc()
+                        os._exit(1)
+                    else:
+                        os._exit(0)
+                else:
                     sock.close()
+
     finally:
         shutdown(1)