diff --git a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
index 2e383764993b83fd05f3acdbc90728325b22e400..7084ff97d90d28e20785483ca8d450db73e7eced 100644
--- a/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
+++ b/bagel/src/main/scala/spark/bagel/examples/WikipediaPageRankStandalone.scala
@@ -126,6 +126,10 @@ class WPRSerializerInstance extends SerializerInstance {
     throw new UnsupportedOperationException()
   }
 
+  def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
+    throw new UnsupportedOperationException()
+  }
+
   def outputStream(s: OutputStream): SerializationStream = {
     new WPRSerializationStream(s)
   }
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
index e7cd4364ee0890bcf85e6bbf4be90eb5c297076c..80f615eeb0a942f183d63128a3adec119101fcbe 100644
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ b/core/src/main/scala/spark/JavaSerializer.scala
@@ -34,6 +34,15 @@ class JavaSerializerInstance extends SerializerInstance {
     in.readObject().asInstanceOf[T]
   }
 
+  def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
+    val bis = new ByteArrayInputStream(bytes)
+    val ois = new ObjectInputStream(bis) {
+      override def resolveClass(desc: ObjectStreamClass) =
+        Class.forName(desc.getName, false, loader)
+    }
+    return ois.readObject.asInstanceOf[T]
+  }
+
   def outputStream(s: OutputStream): SerializationStream = {
     new JavaSerializationStream(s)
   }
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
index 7d25b965d26c8c131cc94191a90ab4cbaa94f0c1..5693613d6d45804767aeeab09c8990cb43babf43 100644
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ b/core/src/main/scala/spark/KryoSerializer.scala
@@ -9,6 +9,7 @@ import scala.collection.mutable
 
 import com.esotericsoftware.kryo._
 import com.esotericsoftware.kryo.{Serializer => KSerializer}
+import com.esotericsoftware.kryo.serialize.ClassSerializer
 import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
 
 /**
@@ -100,6 +101,14 @@ class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
     buf.readClassAndObject(bytes).asInstanceOf[T]
   }
 
+  def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
+    val oldClassLoader = ks.kryo.getClassLoader
+    ks.kryo.setClassLoader(loader)
+    val obj = buf.readClassAndObject(bytes).asInstanceOf[T]
+    ks.kryo.setClassLoader(oldClassLoader)
+    obj
+  }
+
   def outputStream(s: OutputStream): SerializationStream = {
     new KryoSerializationStream(ks.kryo, ks.threadByteBuf.get(), s)
   }
@@ -129,6 +138,8 @@ class KryoSerializer extends Serializer with Logging {
   }
 
   def createKryo(): Kryo = {
+    // This is used so we can serialize/deserialize objects without a zero-arg
+    // constructor.
     val kryo = new KryoReflectionFactorySupport()
 
     // Register some commonly used classes
@@ -150,6 +161,10 @@ class KryoSerializer extends Serializer with Logging {
       kryo.register(obj.getClass)
     }
 
+    // Register the following classes for passing closures.
+    kryo.register(classOf[Class[_]], new ClassSerializer(kryo))
+    kryo.setRegistrationOptional(true)
+
     // Register some commonly used Scala singleton objects. Because these
     // are singletons, we must return the exact same local object when we
     // deserialize rather than returning a clone as FieldSerializer would.
diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala
index 15fca9fcda1f479a065fa12fbb0a9f90f351fce9..2429bbfeb927445e887359465d54a8c8aafcade8 100644
--- a/core/src/main/scala/spark/Serializer.scala
+++ b/core/src/main/scala/spark/Serializer.scala
@@ -16,6 +16,7 @@ trait Serializer {
 trait SerializerInstance {
   def serialize[T](t: T): Array[Byte]
   def deserialize[T](bytes: Array[Byte]): T
+  def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T
   def outputStream(s: OutputStream): SerializationStream
   def inputStream(s: InputStream): DeserializationStream
 }
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 58b5fa6bbd77fbad5a814989468a935f1320b616..b774e5e3b0322fa5836557846e4ed3dd6fafda7d 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -12,27 +12,17 @@ import scala.util.Random
  * Various utility methods used by Spark.
  */
 object Utils {
-  def serialize[T](o: T): Array[Byte] = {
-    val bos = new ByteArrayOutputStream()
-    val oos = new ObjectOutputStream(bos)
-    oos.writeObject(o)
-    oos.close
-    return bos.toByteArray
-  }
 
-  def deserialize[T](bytes: Array[Byte]): T = {
-    val bis = new ByteArrayInputStream(bytes)
-    val ois = new ObjectInputStream(bis)
-    return ois.readObject.asInstanceOf[T]
-  }
+  // The serializer in this object is used by Spark to serialize closures.
+  val serializerClass = System.getProperty("spark.closure.serializer", "spark.JavaSerializer")
+  val ser = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer]
+
+  def serialize[T](o: T): Array[Byte] = ser.newInstance().serialize[T](o)
+  
+  def deserialize[T](bytes: Array[Byte]): T = ser.newInstance().deserialize[T](bytes)
 
   def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
-    val bis = new ByteArrayInputStream(bytes)
-    val ois = new ObjectInputStream(bis) {
-      override def resolveClass(desc: ObjectStreamClass) =
-        Class.forName(desc.getName, false, loader)
-    }
-    return ois.readObject.asInstanceOf[T]
+    ser.newInstance().deserialize[T](bytes, loader)
   }
 
   def isAlpha(c: Char): Boolean = {