diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index be1a04d619c1b2a53682d56d38a2bd1d2e30b59f..7ccde2e8182f3a528609ef16f5ef16c666593ec4 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -1,5 +1,8 @@
 package spark
 
+import collection.mutable
+import serializer.Serializer
+
 import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
 import akka.remote.RemoteActorRefProvider
 
@@ -9,6 +12,7 @@ import spark.storage.BlockManagerMaster
 import spark.network.ConnectionManager
 import spark.serializer.{Serializer, SerializerManager}
 import spark.util.AkkaUtils
+import spark.api.python.PythonWorkerFactory
 
 
 /**
@@ -37,7 +41,10 @@ class SparkEnv (
     // If executorId is NOT found, return defaultHostPort
     var executorIdToHostPort: Option[(String, String) => String]) {
 
+  private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
+
   def stop() {
+    pythonWorkers.foreach { case(key, worker) => worker.stop() }
     httpFileServer.stop()
     mapOutputTracker.stop()
     shuffleFetcher.stop()
@@ -50,6 +57,11 @@ class SparkEnv (
     actorSystem.awaitTermination()
   }
 
+  def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
+    synchronized {
+      pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorkerFactory(pythonExec, envVars)).create()
+    }
+  }
 
   def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = {
     val env = SparkEnv.get
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 807119ca8c08a1430d3db582a16f02b148812983..63140cf37f5bb39a6bf2c9ec92bb166fbdee5715 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -2,10 +2,9 @@ package spark.api.python
 
 import java.io._
 import java.net._
-import java.util.{List => JList, ArrayList => JArrayList, Collections}
+import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
 
 import scala.collection.JavaConversions._
-import scala.io.Source
 
 import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
 import spark.broadcast.Broadcast
@@ -16,7 +15,7 @@ import spark.rdd.PipedRDD
 private[spark] class PythonRDD[T: ClassManifest](
     parent: RDD[T],
     command: Seq[String],
-    envVars: java.util.Map[String, String],
+    envVars: JMap[String, String],
     preservePartitoning: Boolean,
     pythonExec: String,
     broadcastVars: JList[Broadcast[Array[Byte]]],
@@ -25,7 +24,7 @@ private[spark] class PythonRDD[T: ClassManifest](
 
   // Similar to Runtime.exec(), if we are given a single string, split it into words
   // using a standard StringTokenizer (i.e. by spaces)
-  def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String],
+  def this(parent: RDD[T], command: String, envVars: JMap[String, String],
       preservePartitoning: Boolean, pythonExec: String,
       broadcastVars: JList[Broadcast[Array[Byte]]],
       accumulator: Accumulator[JList[Array[Byte]]]) =
@@ -36,35 +35,18 @@ private[spark] class PythonRDD[T: ClassManifest](
 
   override val partitioner = if (preservePartitoning) parent.partitioner else None
 
-  override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
-    val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
-
-    val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py"))
-    // Add the environmental variables to the process.
-    val currentEnvVars = pb.environment()
-
-    for ((variable, value) <- envVars) {
-      currentEnvVars.put(variable, value)
-    }
 
-    val proc = pb.start()
+  override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
+    val startTime = System.currentTimeMillis
     val env = SparkEnv.get
-
-    // Start a thread to print the process's stderr to ours
-    new Thread("stderr reader for " + pythonExec) {
-      override def run() {
-        for (line <- Source.fromInputStream(proc.getErrorStream).getLines) {
-          System.err.println(line)
-        }
-      }
-    }.start()
+    val worker = env.createPythonWorker(pythonExec, envVars.toMap)
 
     // Start a thread to feed the process input from our parent's iterator
     new Thread("stdin writer for " + pythonExec) {
       override def run() {
         SparkEnv.set(env)
-        val out = new PrintWriter(proc.getOutputStream)
-        val dOut = new DataOutputStream(proc.getOutputStream)
+        val out = new PrintWriter(worker.getOutputStream)
+        val dOut = new DataOutputStream(worker.getOutputStream)
         // Partition index
         dOut.writeInt(split.index)
         // sparkFilesDir
@@ -88,16 +70,21 @@ private[spark] class PythonRDD[T: ClassManifest](
         }
         dOut.flush()
         out.flush()
-        proc.getOutputStream.close()
+        worker.shutdownOutput()
       }
     }.start()
 
     // Return an iterator that read lines from the process's stdout
-    val stream = new DataInputStream(proc.getInputStream)
+    val stream = new DataInputStream(worker.getInputStream)
     return new Iterator[Array[Byte]] {
       def next(): Array[Byte] = {
         val obj = _nextObj
-        _nextObj = read()
+        if (hasNext) {
+          // FIXME: can deadlock if worker is waiting for us to
+          // respond to current message (currently irrelevant because
+          // output is shutdown before we read any input)
+          _nextObj = read()
+        }
         obj
       }
 
@@ -108,6 +95,17 @@ private[spark] class PythonRDD[T: ClassManifest](
               val obj = new Array[Byte](length)
               stream.readFully(obj)
               obj
+            case -3 =>
+              // Timing data from worker
+              val bootTime = stream.readLong()
+              val initTime = stream.readLong()
+              val finishTime = stream.readLong()
+              val boot = bootTime - startTime
+              val init = initTime - bootTime
+              val finish = finishTime - initTime
+              val total = finishTime - startTime
+              logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot, init, finish))
+              read
             case -2 =>
               // Signals that an exception has been thrown in python
               val exLength = stream.readInt()
@@ -115,23 +113,21 @@ private[spark] class PythonRDD[T: ClassManifest](
               stream.readFully(obj)
               throw new PythonException(new String(obj))
             case -1 =>
-              // We've finished the data section of the output, but we can still read some
-              // accumulator updates; let's do that, breaking when we get EOFException
-              while (true) {
-                val len2 = stream.readInt()
+              // We've finished the data section of the output, but we can still
+              // read some accumulator updates; let's do that, breaking when we
+              // get a negative length record.
+              var len2 = stream.readInt()
+              while (len2 >= 0) {
                 val update = new Array[Byte](len2)
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
+                len2 = stream.readInt()
               }
               new Array[Byte](0)
           }
         } catch {
           case eof: EOFException => {
-            val exitStatus = proc.waitFor()
-            if (exitStatus != 0) {
-              throw new Exception("Subprocess exited with status " + exitStatus)
-            }
-            new Array[Byte](0)
+            throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
           }
           case e => throw e
         }
@@ -159,7 +155,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
   override def compute(split: Partition, context: TaskContext) =
     prev.iterator(split, context).grouped(2).map {
       case Seq(a, b) => (a, b)
-      case x          => throw new Exception("PairwiseRDD: unexpected value: " + x)
+      case x          => throw new SparkException("PairwiseRDD: unexpected value: " + x)
     }
   val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
 }
@@ -215,7 +211,7 @@ private[spark] object PythonRDD {
       dOut.write(s)
       dOut.writeByte(Pickle.STOP)
     } else {
-      throw new Exception("Unexpected RDD type")
+      throw new SparkException("Unexpected RDD type")
     }
   }
 
diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8844411d738af803d34fceebeba8678628840b3b
--- /dev/null
+++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
@@ -0,0 +1,95 @@
+package spark.api.python
+
+import java.io.{DataInputStream, IOException}
+import java.net.{Socket, SocketException, InetAddress}
+
+import scala.collection.JavaConversions._
+
+import spark._
+
+private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
+    extends Logging {
+  var daemon: Process = null
+  val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
+  var daemonPort: Int = 0
+
+  def create(): Socket = {
+    synchronized {
+      // Start the daemon if it hasn't been started
+      startDaemon()
+
+      // Attempt to connect, restart and retry once if it fails
+      try {
+        new Socket(daemonHost, daemonPort)
+      } catch {
+        case exc: SocketException => {
+          logWarning("Python daemon unexpectedly quit, attempting to restart")
+          stopDaemon()
+          startDaemon()
+          new Socket(daemonHost, daemonPort)
+        }
+        case e => throw e
+      }
+    }
+  }
+
+  def stop() {
+    stopDaemon()
+  }
+
+  private def startDaemon() {
+    synchronized {
+      // Is it already running?
+      if (daemon != null) {
+        return
+      }
+
+      try {
+        // Create and start the daemon
+        val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
+        val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
+        val workerEnv = pb.environment()
+        workerEnv.putAll(envVars)
+        daemon = pb.start()
+        daemonPort = new DataInputStream(daemon.getInputStream).readInt()
+
+        // Redirect the stderr to ours
+        new Thread("stderr reader for " + pythonExec) {
+          override def run() {
+            scala.util.control.Exception.ignoring(classOf[IOException]) {
+              // FIXME HACK: We copy the stream on the level of bytes to
+              // attempt to dodge encoding problems.
+              val in = daemon.getErrorStream
+              var buf = new Array[Byte](1024)
+              var len = in.read(buf)
+              while (len != -1) {
+                System.err.write(buf, 0, len)
+                len = in.read(buf)
+              }
+            }
+          }
+        }.start()
+      } catch {
+        case e => {
+          stopDaemon()
+          throw e
+        }
+      }
+
+      // Important: don't close daemon's stdin (daemon.getOutputStream) so it can correctly
+      // detect our disappearance.
+    }
+  }
+
+  private def stopDaemon() {
+    synchronized {
+      // Request shutdown of existing daemon by sending SIGTERM
+      if (daemon != null) {
+        daemon.destroy()
+      }
+
+      daemon = null
+      daemonPort = 0
+    }
+  }
+}
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
new file mode 100644
index 0000000000000000000000000000000000000000..78a2da1e18ea497df35cc6dd6900e65e1c3619c3
--- /dev/null
+++ b/python/pyspark/daemon.py
@@ -0,0 +1,158 @@
+import os
+import sys
+import multiprocessing
+from ctypes import c_bool
+from errno import EINTR, ECHILD
+from socket import socket, AF_INET, SOCK_STREAM, SOMAXCONN
+from signal import signal, SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
+from pyspark.worker import main as worker_main
+from pyspark.serializers import write_int
+
+try:
+    POOLSIZE = multiprocessing.cpu_count()
+except NotImplementedError:
+    POOLSIZE = 4
+
+exit_flag = multiprocessing.Value(c_bool, False)
+
+
+def should_exit():
+    global exit_flag
+    return exit_flag.value
+
+
+def compute_real_exit_code(exit_code):
+  # SystemExit's code can be integer or string, but os._exit only accepts integers
+  import numbers
+  if isinstance(exit_code, numbers.Integral):
+    return exit_code
+  else:
+    return 1
+
+
+def worker(listen_sock):
+    # Redirect stdout to stderr
+    os.dup2(2, 1)
+
+    # Manager sends SIGHUP to request termination of workers in the pool
+    def handle_sighup(*args):
+        assert should_exit()
+    signal(SIGHUP, handle_sighup)
+
+    # Cleanup zombie children
+    def handle_sigchld(*args):
+        pid = status = None
+        try:
+            while (pid, status) != (0, 0):
+                pid, status = os.waitpid(0, os.WNOHANG)
+        except EnvironmentError as err:
+            if err.errno == EINTR:
+                # retry
+                handle_sigchld()
+            elif err.errno != ECHILD:
+                raise
+    signal(SIGCHLD, handle_sigchld)
+
+    # Handle clients
+    while not should_exit():
+        # Wait until a client arrives or we have to exit
+        sock = None
+        while not should_exit() and sock is None:
+            try:
+                sock, addr = listen_sock.accept()
+            except EnvironmentError as err:
+                if err.errno != EINTR:
+                    raise
+
+        if sock is not None:
+            # Fork a child to handle the client.
+            # The client is handled in the child so that the manager
+            # never receives SIGCHLD unless a worker crashes.
+            if os.fork() == 0:
+                # Leave the worker pool
+                signal(SIGHUP, SIG_DFL)
+                listen_sock.close()
+                # Handle the client then exit
+                sockfile = sock.makefile()
+                exit_code = 0
+                try:
+                  worker_main(sockfile, sockfile)
+                except SystemExit as exc:
+                  exit_code = exc.code
+                finally:
+                  sockfile.close()
+                  sock.close()
+                  os._exit(compute_real_exit_code(exit_code))
+            else:
+                sock.close()
+
+
+def launch_worker(listen_sock):
+    if os.fork() == 0:
+        try:
+            worker(listen_sock)
+        except Exception as err:
+            import traceback
+            traceback.print_exc()
+            os._exit(1)
+        else:
+            assert should_exit()
+            os._exit(0)
+
+
+def manager():
+    # Create a new process group to corral our children
+    os.setpgid(0, 0)
+
+    # Create a listening socket on the AF_INET loopback interface
+    listen_sock = socket(AF_INET, SOCK_STREAM)
+    listen_sock.bind(('127.0.0.1', 0))
+    listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN))
+    listen_host, listen_port = listen_sock.getsockname()
+    write_int(listen_port, sys.stdout)
+
+    # Launch initial worker pool
+    for idx in range(POOLSIZE):
+        launch_worker(listen_sock)
+    listen_sock.close()
+
+    def shutdown():
+        global exit_flag
+        exit_flag.value = True
+
+    # Gracefully exit on SIGTERM, don't die on SIGHUP
+    signal(SIGTERM, lambda signum, frame: shutdown())
+    signal(SIGHUP, SIG_IGN)
+
+    # Cleanup zombie children
+    def handle_sigchld(*args):
+        try:
+            pid, status = os.waitpid(0, os.WNOHANG)
+            if status != 0 and not should_exit():
+                raise RuntimeError("worker crashed: %s, %s" % (pid, status))
+        except EnvironmentError as err:
+            if err.errno not in (ECHILD, EINTR):
+                raise
+    signal(SIGCHLD, handle_sigchld)
+
+    # Initialization complete
+    sys.stdout.close()
+    try:
+        while not should_exit():
+            try:
+                # Spark tells us to exit by closing stdin
+                if os.read(0, 512) == '':
+                    shutdown()
+            except EnvironmentError as err:
+                if err.errno != EINTR:
+                    shutdown()
+                    raise
+    finally:
+        signal(SIGTERM, SIG_DFL)
+        exit_flag.value = True
+        # Send SIGHUP to notify workers of shutdown
+        os.kill(0, SIGHUP)
+
+
+if __name__ == '__main__':
+    manager()
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 115cf28cc2ba461754530a8c3e7da33c6e7c3ed9..5a95144983aeca2cd59bf4c51eed0933e5ea8e14 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -46,6 +46,10 @@ def read_long(stream):
     return struct.unpack("!q", length)[0]
 
 
+def write_long(value, stream):
+    stream.write(struct.pack("!q", value))
+
+
 def read_int(stream):
     length = stream.read(4)
     if length == "":
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 6a1962d26795dfe950049de5b3a07b3d7f12354a..1e34d473650bf03ba93fb5a509d49e831467ed4d 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -12,6 +12,7 @@ import unittest
 from pyspark.context import SparkContext
 from pyspark.files import SparkFiles
 from pyspark.java_gateway import SPARK_HOME
+from pyspark.serializers import read_int
 
 
 class PySparkTestCase(unittest.TestCase):
@@ -117,5 +118,47 @@ class TestIO(PySparkTestCase):
         self.sc.parallelize([1]).foreach(func)
 
 
+class TestDaemon(unittest.TestCase):
+    def connect(self, port):
+        from socket import socket, AF_INET, SOCK_STREAM
+        sock = socket(AF_INET, SOCK_STREAM)
+        sock.connect(('127.0.0.1', port))
+        # send a split index of -1 to shutdown the worker
+        sock.send("\xFF\xFF\xFF\xFF")
+        sock.close()
+        return True
+
+    def do_termination_test(self, terminator):
+        from subprocess import Popen, PIPE
+        from errno import ECONNREFUSED
+
+        # start daemon
+        daemon_path = os.path.join(os.path.dirname(__file__), "daemon.py")
+        daemon = Popen([sys.executable, daemon_path], stdin=PIPE, stdout=PIPE)
+
+        # read the port number
+        port = read_int(daemon.stdout)
+
+        # daemon should accept connections
+        self.assertTrue(self.connect(port))
+
+        # request shutdown
+        terminator(daemon)
+        time.sleep(1)
+
+        # daemon should no longer accept connections
+        with self.assertRaises(EnvironmentError) as trap:
+            self.connect(port)
+        self.assertEqual(trap.exception.errno, ECONNREFUSED)
+
+    def test_termination_stdin(self):
+        """Ensure that daemon and workers terminate when stdin is closed."""
+        self.do_termination_test(lambda daemon: daemon.stdin.close())
+
+    def test_termination_sigterm(self):
+        """Ensure that daemon and workers terminate on SIGTERM."""
+        from signal import SIGTERM
+        self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM))
+
 if __name__ == "__main__":
     unittest.main()
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 812e7a9da5a030875cf8d632c7e2040285030599..379bbfd4c2cfee7d9ed93ee935e4f6da4194120f 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -3,6 +3,7 @@ Worker that receives input from Piped RDD.
 """
 import os
 import sys
+import time
 import traceback
 from base64 import standard_b64decode
 # CloudPickler needs to be imported so that depicklers are registered using the
@@ -12,48 +13,60 @@ from pyspark.broadcast import Broadcast, _broadcastRegistry
 from pyspark.cloudpickle import CloudPickler
 from pyspark.files import SparkFiles
 from pyspark.serializers import write_with_length, read_with_length, write_int, \
-    read_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
+    read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
 
 
-# Redirect stdout to stderr so that users must return values from functions.
-old_stdout = os.fdopen(os.dup(1), 'w')
-os.dup2(2, 1)
+def load_obj(infile):
+    return load_pickle(standard_b64decode(infile.readline().strip()))
 
 
-def load_obj():
-    return load_pickle(standard_b64decode(sys.stdin.readline().strip()))
+def report_times(outfile, boot, init, finish):
+    write_int(-3, outfile)
+    write_long(1000 * boot, outfile)
+    write_long(1000 * init, outfile)
+    write_long(1000 * finish, outfile)
 
 
-def main():
-    split_index = read_int(sys.stdin)
-    spark_files_dir = load_pickle(read_with_length(sys.stdin))
+def main(infile, outfile):
+    boot_time = time.time()
+    split_index = read_int(infile)
+    if split_index == -1:  # for unit tests
+        return
+    spark_files_dir = load_pickle(read_with_length(infile))
     SparkFiles._root_directory = spark_files_dir
     SparkFiles._is_running_on_worker = True
     sys.path.append(spark_files_dir)
-    num_broadcast_variables = read_int(sys.stdin)
+    num_broadcast_variables = read_int(infile)
     for _ in range(num_broadcast_variables):
-        bid = read_long(sys.stdin)
-        value = read_with_length(sys.stdin)
+        bid = read_long(infile)
+        value = read_with_length(infile)
         _broadcastRegistry[bid] = Broadcast(bid, load_pickle(value))
-    func = load_obj()
-    bypassSerializer = load_obj()
+    func = load_obj(infile)
+    bypassSerializer = load_obj(infile)
     if bypassSerializer:
         dumps = lambda x: x
     else:
         dumps = dump_pickle
-    iterator = read_from_pickle_file(sys.stdin)
+    init_time = time.time()
+    iterator = read_from_pickle_file(infile)
     try:
         for obj in func(split_index, iterator):
-           write_with_length(dumps(obj), old_stdout)
+            write_with_length(dumps(obj), outfile)
     except Exception as e:
-        write_int(-2, old_stdout)
-        write_with_length(traceback.format_exc(), old_stdout)
+        write_int(-2, outfile)
+        write_with_length(traceback.format_exc(), outfile)
         sys.exit(-1)
+    finish_time = time.time()
+    report_times(outfile, boot_time, init_time, finish_time)
     # Mark the beginning of the accumulators section of the output
-    write_int(-1, old_stdout)
+    write_int(-1, outfile)
     for aid, accum in _accumulatorRegistry.items():
-        write_with_length(dump_pickle((aid, accum._value)), old_stdout)
+        write_with_length(dump_pickle((aid, accum._value)), outfile)
+    write_int(-1, outfile)
 
 
 if __name__ == '__main__':
-    main()
+    # Redirect stdout to stderr so that users must return values from functions.
+    old_stdout = os.fdopen(os.dup(1), 'w')
+    os.dup2(2, 1)
+    main(sys.stdin, old_stdout)