diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index be1a04d619c1b2a53682d56d38a2bd1d2e30b59f..5691e24c320f678e3ce23d79cc70ba3ddb7aede8 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -1,5 +1,8 @@
 package spark
 
+import collection.mutable
+import serializer.Serializer
+
 import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
 import akka.remote.RemoteActorRefProvider
 
@@ -9,6 +12,7 @@ import spark.storage.BlockManagerMaster
 import spark.network.ConnectionManager
 import spark.serializer.{Serializer, SerializerManager}
 import spark.util.AkkaUtils
+import spark.api.python.PythonWorker
 
 
 /**
@@ -37,6 +41,8 @@ class SparkEnv (
     // If executorId is NOT found, return defaultHostPort
     var executorIdToHostPort: Option[(String, String) => String]) {
 
+  private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorker]()
+
   def stop() {
     httpFileServer.stop()
     mapOutputTracker.stop()
@@ -50,6 +56,11 @@ class SparkEnv (
     actorSystem.awaitTermination()
   }
 
+  def getPythonWorker(pythonExec: String, envVars: Map[String, String]): PythonWorker = {
+    synchronized {
+      pythonWorkers.getOrElseUpdate((pythonExec, envVars), new PythonWorker(pythonExec, envVars))
+    }
+  }
 
   def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = {
     val env = SparkEnv.get
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index e9978d713fe4a6b4bdeac5f0ced1e3000035f808..e5acc54c016aa319e38f6360fb3fdd116c779642 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -2,10 +2,9 @@ package spark.api.python
 
 import java.io._
 import java.net._
-import java.util.{List => JList, ArrayList => JArrayList, Collections}
+import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
 
 import scala.collection.JavaConversions._
-import scala.io.Source
 
 import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
 import spark.broadcast.Broadcast
@@ -16,7 +15,7 @@ import spark.rdd.PipedRDD
 private[spark] class PythonRDD[T: ClassManifest](
     parent: RDD[T],
     command: Seq[String],
-    envVars: java.util.Map[String, String],
+    envVars: JMap[String, String],
     preservePartitoning: Boolean,
     pythonExec: String,
     broadcastVars: JList[Broadcast[Array[Byte]]],
@@ -25,7 +24,7 @@ private[spark] class PythonRDD[T: ClassManifest](
 
   // Similar to Runtime.exec(), if we are given a single string, split it into words
   // using a standard StringTokenizer (i.e. by spaces)
-  def this(parent: RDD[T], command: String, envVars: java.util.Map[String, String],
+  def this(parent: RDD[T], command: String, envVars: JMap[String, String],
       preservePartitoning: Boolean, pythonExec: String,
       broadcastVars: JList[Broadcast[Array[Byte]]],
       accumulator: Accumulator[JList[Array[Byte]]]) =
@@ -36,36 +35,18 @@ private[spark] class PythonRDD[T: ClassManifest](
 
   override val partitioner = if (preservePartitoning) parent.partitioner else None
 
-  override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
-    val SPARK_HOME = new ProcessBuilder().environment().get("SPARK_HOME")
-
-    val pb = new ProcessBuilder(Seq(pythonExec, SPARK_HOME + "/python/pyspark/worker.py"))
-    // Add the environmental variables to the process.
-    val currentEnvVars = pb.environment()
-
-    for ((variable, value) <- envVars) {
-      currentEnvVars.put(variable, value)
-    }
 
+  override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
     val startTime = System.currentTimeMillis
-    val proc = pb.start()
+    val worker = SparkEnv.get.getPythonWorker(pythonExec, envVars.toMap).create
     val env = SparkEnv.get
 
-    // Start a thread to print the process's stderr to ours
-    new Thread("stderr reader for " + pythonExec) {
-      override def run() {
-        for (line <- Source.fromInputStream(proc.getErrorStream).getLines) {
-          System.err.println(line)
-        }
-      }
-    }.start()
-
     // Start a thread to feed the process input from our parent's iterator
     new Thread("stdin writer for " + pythonExec) {
       override def run() {
         SparkEnv.set(env)
-        val out = new PrintWriter(proc.getOutputStream)
-        val dOut = new DataOutputStream(proc.getOutputStream)
+        val out = new PrintWriter(worker.getOutputStream)
+        val dOut = new DataOutputStream(worker.getOutputStream)
         // Partition index
         dOut.writeInt(split.index)
         // sparkFilesDir
@@ -89,16 +70,21 @@ private[spark] class PythonRDD[T: ClassManifest](
         }
         dOut.flush()
         out.flush()
-        proc.getOutputStream.close()
+        worker.shutdownOutput()
       }
     }.start()
 
     // Return an iterator that read lines from the process's stdout
-    val stream = new DataInputStream(proc.getInputStream)
+    val stream = new DataInputStream(worker.getInputStream)
     return new Iterator[Array[Byte]] {
       def next(): Array[Byte] = {
         val obj = _nextObj
-        _nextObj = read()
+        if (hasNext) {
+          // FIXME: can deadlock if worker is waiting for us to
+          // respond to current message (currently irrelevant because
+          // output is shutdown before we read any input)
+          _nextObj = read()
+        }
         obj
       }
 
@@ -110,7 +96,7 @@ private[spark] class PythonRDD[T: ClassManifest](
               stream.readFully(obj)
               obj
             case -3 =>
-              // Timing data from child
+              // Timing data from worker
               val bootTime = stream.readLong()
               val initTime = stream.readLong()
               val finishTime = stream.readLong()
@@ -127,23 +113,21 @@ private[spark] class PythonRDD[T: ClassManifest](
               stream.readFully(obj)
               throw new PythonException(new String(obj))
             case -1 =>
-              // We've finished the data section of the output, but we can still read some
-              // accumulator updates; let's do that, breaking when we get EOFException
-              while (true) {
-                val len2 = stream.readInt()
+              // We've finished the data section of the output, but we can still
+              // read some accumulator updates; let's do that, breaking when we
+              // get a negative length record.
+              var len2 = stream.readInt
+              while (len2 >= 0) {
                 val update = new Array[Byte](len2)
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
+                len2 = stream.readInt
               }
               new Array[Byte](0)
           }
         } catch {
           case eof: EOFException => {
-            val exitStatus = proc.waitFor()
-            if (exitStatus != 0) {
-              throw new Exception("Subprocess exited with status " + exitStatus)
-            }
-            new Array[Byte](0)
+            throw new SparkException("Python worker exited unexpectedly (crashed)", eof)
           }
           case e => throw e
         }
@@ -171,7 +155,7 @@ private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
   override def compute(split: Partition, context: TaskContext) =
     prev.iterator(split, context).grouped(2).map {
       case Seq(a, b) => (a, b)
-      case x          => throw new Exception("PairwiseRDD: unexpected value: " + x)
+      case x          => throw new SparkException("PairwiseRDD: unexpected value: " + x)
     }
   val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
 }
@@ -227,7 +211,7 @@ private[spark] object PythonRDD {
       dOut.write(s)
       dOut.writeByte(Pickle.STOP)
     } else {
-      throw new Exception("Unexpected RDD type")
+      throw new SparkException("Unexpected RDD type")
     }
   }
 
diff --git a/core/src/main/scala/spark/api/python/PythonWorker.scala b/core/src/main/scala/spark/api/python/PythonWorker.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8ee3c6884f98f0998bd44a5ad43042f79c00f073
--- /dev/null
+++ b/core/src/main/scala/spark/api/python/PythonWorker.scala
@@ -0,0 +1,89 @@
+package spark.api.python
+
+import java.io.DataInputStream
+import java.net.{Socket, SocketException, InetAddress}
+
+import scala.collection.JavaConversions._
+
+import spark._
+
+private[spark] class PythonWorker(pythonExec: String, envVars: Map[String, String])
+    extends Logging {
+  var daemon: Process = null
+  val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
+  var daemonPort: Int = 0
+
+  def create(): Socket = {
+    synchronized {
+      // Start the daemon if it hasn't been started
+      startDaemon
+
+      // Attempt to connect, restart and retry once if it fails
+      try {
+        new Socket(daemonHost, daemonPort)
+      } catch {
+        case exc: SocketException => {
+          logWarning("Python daemon unexpectedly quit, attempting to restart")
+          stopDaemon
+          startDaemon
+          new Socket(daemonHost, daemonPort)
+        }
+        case e => throw e
+      }
+    }
+  }
+
+  private def startDaemon() {
+    synchronized {
+      // Is it already running?
+      if (daemon != null) {
+        return
+      }
+
+      try {
+        // Create and start the daemon
+        val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
+        val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
+        val workerEnv = pb.environment()
+        workerEnv.putAll(envVars)
+        daemon = pb.start()
+        daemonPort = new DataInputStream(daemon.getInputStream).readInt
+
+        // Redirect the stderr to ours
+        new Thread("stderr reader for " + pythonExec) {
+          override def run() {
+            // FIXME HACK: We copy the stream on the level of bytes to
+            // attempt to dodge encoding problems.
+            val in = daemon.getErrorStream
+            var buf = new Array[Byte](1024)
+            var len = in.read(buf)
+            while (len != -1) {
+              System.err.write(buf, 0, len)
+              len = in.read(buf)
+            }
+          }
+        }.start()
+      } catch {
+        case e => {
+          stopDaemon
+          throw e
+        }
+      }
+
+      // Important: don't close daemon's stdin (daemon.getOutputStream) so it can correctly
+      // detect our disappearance.
+    }
+  }
+
+  private def stopDaemon() {
+    synchronized {
+      // Request shutdown of existing daemon by sending SIGTERM
+      if (daemon != null) {
+        daemon.destroy
+      }
+
+      daemon = null
+      daemonPort = 0
+    }
+  }
+}
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
new file mode 100644
index 0000000000000000000000000000000000000000..642f30b2b9644aff7f105230f57cb56a9639bc30
--- /dev/null
+++ b/python/pyspark/daemon.py
@@ -0,0 +1,109 @@
+import os
+import sys
+import multiprocessing
+from errno import EINTR, ECHILD
+from socket import socket, AF_INET, SOCK_STREAM, SOMAXCONN
+from signal import signal, SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
+from pyspark.worker import main as worker_main
+from pyspark.serializers import write_int
+
+try:
+    POOLSIZE = multiprocessing.cpu_count()
+except NotImplementedError:
+    POOLSIZE = 4
+
+should_exit = False
+
+
+def worker(listen_sock):
+    # Redirect stdout to stderr
+    os.dup2(2, 1)
+
+    # Manager sends SIGHUP to request termination of workers in the pool
+    def handle_sighup(signum, frame):
+        global should_exit
+        should_exit = True
+    signal(SIGHUP, handle_sighup)
+
+    while not should_exit:
+        # Wait until a client arrives or we have to exit
+        sock = None
+        while not should_exit and sock is None:
+            try:
+                sock, addr = listen_sock.accept()
+            except EnvironmentError as err:
+                if err.errno != EINTR:
+                    raise
+
+        if sock is not None:
+            # Fork a child to handle the client
+            if os.fork() == 0:
+                # Leave the worker pool
+                signal(SIGHUP, SIG_DFL)
+                listen_sock.close()
+                # Handle the client then exit
+                sockfile = sock.makefile()
+                worker_main(sockfile, sockfile)
+                sockfile.close()
+                sock.close()
+                os._exit(0)
+            else:
+                sock.close()
+
+    assert should_exit
+    os._exit(0)
+
+
+def manager():
+    # Create a new process group to corral our children
+    os.setpgid(0, 0)
+
+    # Create a listening socket on the AF_INET loopback interface
+    listen_sock = socket(AF_INET, SOCK_STREAM)
+    listen_sock.bind(('127.0.0.1', 0))
+    listen_sock.listen(max(1024, 2 * POOLSIZE, SOMAXCONN))
+    listen_host, listen_port = listen_sock.getsockname()
+    write_int(listen_port, sys.stdout)
+
+    # Launch initial worker pool
+    for idx in range(POOLSIZE):
+        if os.fork() == 0:
+            worker(listen_sock)
+            raise RuntimeError("worker() unexpectedly returned")
+    listen_sock.close()
+
+    def shutdown():
+        global should_exit
+        os.kill(0, SIGHUP)
+        should_exit = True
+
+    # Gracefully exit on SIGTERM, don't die on SIGHUP
+    signal(SIGTERM, lambda signum, frame: shutdown())
+    signal(SIGHUP, SIG_IGN)
+
+    # Cleanup zombie children
+    def handle_sigchld(signum, frame):
+        try:
+            pid, status = os.waitpid(0, os.WNOHANG)
+            if (pid, status) != (0, 0) and not should_exit:
+                raise RuntimeError("pool member crashed: %s, %s" % (pid, status))
+        except EnvironmentError as err:
+            if err.errno not in (ECHILD, EINTR):
+                raise
+    signal(SIGCHLD, handle_sigchld)
+
+    # Initialization complete
+    sys.stdout.close()
+    while not should_exit:
+        try:
+            # Spark tells us to exit by closing stdin
+            if sys.stdin.read() == '':
+                shutdown()
+        except EnvironmentError as err:
+            if err.errno != EINTR:
+                shutdown()
+                raise
+
+
+if __name__ == '__main__':
+    manager()
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 4c33ae49dcc95a4bf0b2aab3d5854653ea9882eb..94d612ea6ed926c42f44f0c57f91078b3aa2afc3 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1,10 +1,9 @@
 """
 Worker that receives input from Piped RDD.
 """
-import time
-preboot_time = time.time()
 import os
 import sys
+import time
 import traceback
 from base64 import standard_b64decode
 # CloudPickler needs to be imported so that depicklers are registered using the
@@ -17,57 +16,55 @@ from pyspark.serializers import write_with_length, read_with_length, write_int,
     read_long, write_long, read_int, dump_pickle, load_pickle, read_from_pickle_file
 
 
-# Redirect stdout to stderr so that users must return values from functions.
-old_stdout = os.fdopen(os.dup(1), 'w')
-os.dup2(2, 1)
-
-
-def load_obj():
-    return load_pickle(standard_b64decode(sys.stdin.readline().strip()))
+def load_obj(infile):
+    return load_pickle(standard_b64decode(infile.readline().strip()))
 
 
-def report_times(preboot, boot, init, finish):
-    write_int(-3, old_stdout)
-    write_long(1000 * preboot, old_stdout)
-    write_long(1000 * boot, old_stdout)
-    write_long(1000 * init, old_stdout)
-    write_long(1000 * finish, old_stdout)
+def report_times(outfile, boot, init, finish):
+    write_int(-3, outfile)
+    write_long(1000 * boot, outfile)
+    write_long(1000 * init, outfile)
+    write_long(1000 * finish, outfile)
 
 
-def main():
+def main(infile, outfile):
     boot_time = time.time()
-    split_index = read_int(sys.stdin)
-    spark_files_dir = load_pickle(read_with_length(sys.stdin))
+    split_index = read_int(infile)
+    spark_files_dir = load_pickle(read_with_length(infile))
     SparkFiles._root_directory = spark_files_dir
     SparkFiles._is_running_on_worker = True
     sys.path.append(spark_files_dir)
-    num_broadcast_variables = read_int(sys.stdin)
+    num_broadcast_variables = read_int(infile)
     for _ in range(num_broadcast_variables):
-        bid = read_long(sys.stdin)
-        value = read_with_length(sys.stdin)
+        bid = read_long(infile)
+        value = read_with_length(infile)
         _broadcastRegistry[bid] = Broadcast(bid, load_pickle(value))
-    func = load_obj()
-    bypassSerializer = load_obj()
+    func = load_obj(infile)
+    bypassSerializer = load_obj(infile)
     if bypassSerializer:
         dumps = lambda x: x
     else:
         dumps = dump_pickle
     init_time = time.time()
-    iterator = read_from_pickle_file(sys.stdin)
+    iterator = read_from_pickle_file(infile)
     try:
         for obj in func(split_index, iterator):
-           write_with_length(dumps(obj), old_stdout)
+            write_with_length(dumps(obj), outfile)
     except Exception as e:
-        write_int(-2, old_stdout)
-        write_with_length(traceback.format_exc(), old_stdout)
-        sys.exit(-1)
+        write_int(-2, outfile)
+        write_with_length(traceback.format_exc(), outfile)
+        raise
     finish_time = time.time()
-    report_times(preboot_time, boot_time, init_time, finish_time)
+    report_times(outfile, boot_time, init_time, finish_time)
     # Mark the beginning of the accumulators section of the output
-    write_int(-1, old_stdout)
+    write_int(-1, outfile)
     for aid, accum in _accumulatorRegistry.items():
-        write_with_length(dump_pickle((aid, accum._value)), old_stdout)
+        write_with_length(dump_pickle((aid, accum._value)), outfile)
+    write_int(-1, outfile)
 
 
 if __name__ == '__main__':
-    main()
+    # Redirect stdout to stderr so that users must return values from functions.
+    old_stdout = os.fdopen(os.dup(1), 'w')
+    os.dup2(2, 1)
+    main(sys.stdin, old_stdout)