diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index 1dd5e2503317986408ff7574133c2675589d6a8e..98d757e116dcc4b74c603f82c889697d0e6a7846 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -2,7 +2,7 @@ package spark
 
 import java.io.{File, FileOutputStream}
 import java.net.{URI, URL, URLClassLoader}
-import java.util.concurrent.{Executors, ExecutorService}
+import java.util.concurrent._
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -34,7 +34,7 @@ class Executor extends mesos.Executor with Logging {
     Thread.currentThread.setContextClassLoader(classLoader)
     
     // Start worker thread pool (they will inherit our context ClassLoader)
-    threadPool = Executors.newCachedThreadPool()
+    threadPool = new ThreadPoolExecutor(1, 128, 600, TimeUnit.SECONDS, new LinkedBlockingQueue[Runnable])
   }
   
   override def launchTask(d: ExecutorDriver, desc: TaskDescription) {
diff --git a/core/src/main/scala/spark/KryoSerialization.scala b/core/src/main/scala/spark/KryoSerialization.scala
index 462dee217e4a36a6a9b740de7fbff00c0ad5e6c5..54427ecf71b42d0b228789ac3d9d199d30ab825b 100644
--- a/core/src/main/scala/spark/KryoSerialization.scala
+++ b/core/src/main/scala/spark/KryoSerialization.scala
@@ -58,10 +58,8 @@ object ZigZag {
   }
 }
 
-class KryoSerializationStream(kryo: Kryo, out: OutputStream)
+class KryoSerializationStream(kryo: Kryo, buf: ByteBuffer, out: OutputStream)
 extends SerializationStream {
-  val buf = ByteBuffer.allocateDirect(1024*1024)
-
   def writeObject[T](t: T) {
     kryo.writeClassAndObject(buf, t)
     ZigZag.writeInt(buf.position(), out)
@@ -74,10 +72,8 @@ extends SerializationStream {
   def close() { out.close() }
 }
 
-class KryoDeserializationStream(kryo: Kryo, in: InputStream)
+class KryoDeserializationStream(buf: ObjectBuffer, in: InputStream)
 extends DeserializationStream {
-  val buf = new ObjectBuffer(kryo, 1024*1024)
-
   def readObject[T](): T = {
     val len = ZigZag.readInt(in)
     buf.readClassAndObject(in, len).asInstanceOf[T]
@@ -86,8 +82,8 @@ extends DeserializationStream {
   def close() { in.close() }
 }
 
-class KryoSerializer(kryo: Kryo) extends Serializer {
-  val buf = new ObjectBuffer(kryo, 1024*1024)
+class KryoSerializer(strat: KryoSerialization) extends Serializer {
+  val buf = strat.threadBuf.get()
 
   def serialize[T](t: T): Array[Byte] = {
     buf.writeClassAndObject(t)
@@ -98,11 +94,11 @@ class KryoSerializer(kryo: Kryo) extends Serializer {
   }
 
   def outputStream(s: OutputStream): SerializationStream = {
-    new KryoSerializationStream(kryo, s)
+    new KryoSerializationStream(strat.kryo, strat.threadByteBuf.get(), s)
   }
 
   def inputStream(s: InputStream): DeserializationStream = {
-    new KryoDeserializationStream(kryo, s)
+    new KryoDeserializationStream(buf, s)
   }
 }
 
@@ -114,6 +110,14 @@ trait KryoRegistrator {
 class KryoSerialization extends SerializationStrategy with Logging {
   val kryo = createKryo()
 
+  val threadBuf = new ThreadLocal[ObjectBuffer] {
+    override def initialValue = new ObjectBuffer(kryo, 257*1024*1024)
+  }
+
+  val threadByteBuf = new ThreadLocal[ByteBuffer] {
+    override def initialValue = ByteBuffer.allocate(257*1024*1024)
+  }
+
   def createKryo(): Kryo = {
     val kryo = new Kryo()
 
@@ -158,5 +162,5 @@ class KryoSerialization extends SerializationStrategy with Logging {
     kryo
   }
 
-  def newSerializer(): Serializer = new KryoSerializer(kryo)
+  def newSerializer(): Serializer = new KryoSerializer(this)
 }