diff --git a/core/src/main/scala/spark/KryoSerialization.scala b/core/src/main/scala/spark/KryoSerialization.scala
index f17051f5a86db8a821c3f24f9527e42c34d3e1ad..f6e818adb0a9c397940becb94f8f6e3400b3338d 100644
--- a/core/src/main/scala/spark/KryoSerialization.scala
+++ b/core/src/main/scala/spark/KryoSerialization.scala
@@ -117,7 +117,7 @@ class KryoSerialization extends SerializationStrategy with Logging {
     val kryo = new Kryo()
     val toRegister: Seq[AnyRef] = Seq(
       // Arrays
-      Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""),
+      Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""), Array(("", "")),
       // Specialized Tuple2s
       ("", ""), (1, 1), (1.0, 1.0), (1L, 1L),
       (1, 1.0), (1.0, 1), (1L, 1.0), (1.0, 1L), (1, 1L), (1L, 1),
diff --git a/core/src/main/scala/spark/SerializingCache.scala b/core/src/main/scala/spark/SerializingCache.scala
new file mode 100644
index 0000000000000000000000000000000000000000..cbc64736e6f494ac757279f87cfcb259b976b489
--- /dev/null
+++ b/core/src/main/scala/spark/SerializingCache.scala
@@ -0,0 +1,26 @@
+package spark
+
+import java.io._
+
+/**
+ * Wrapper around a BoundedMemoryCache that stores serialized objects as
+ * byte arrays in order to reduce storage cost and GC overhead
+ */
+class SerializingCache extends Cache with Logging {
+  val bmc = new BoundedMemoryCache
+
+  override def put(key: Any, value: Any) {
+    val ser = Serializer.newInstance()
+    bmc.put(key, ser.serialize(value))
+  }
+
+  override def get(key: Any): Any = {
+    val bytes = bmc.get(key)
+    if (bytes != null) {
+      val ser = Serializer.newInstance()
+      return ser.deserialize(bytes.asInstanceOf[Array[Byte]])
+    } else {
+      return null
+    }
+  }
+}