From 7febdfbe2952aeef5e5f0a1cdbc95df4a274fd78 Mon Sep 17 00:00:00 2001
From: Matei Zaharia <matei@eecs.berkeley.edu>
Date: Tue, 8 Mar 2011 12:36:36 -0800
Subject: [PATCH] Better reuse of buffers in Kryo serialization

---
 .../main/scala/spark/KryoSerialization.scala  | 26 +++++++++++--------
 1 file changed, 15 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/spark/KryoSerialization.scala b/core/src/main/scala/spark/KryoSerialization.scala
index 462dee217e..63e22ae4ae 100644
--- a/core/src/main/scala/spark/KryoSerialization.scala
+++ b/core/src/main/scala/spark/KryoSerialization.scala
@@ -58,10 +58,8 @@ object ZigZag {
   }
 }
 
-class KryoSerializationStream(kryo: Kryo, out: OutputStream)
+class KryoSerializationStream(kryo: Kryo, buf: ByteBuffer, out: OutputStream)
 extends SerializationStream {
-  val buf = ByteBuffer.allocateDirect(1024*1024)
-
   def writeObject[T](t: T) {
     kryo.writeClassAndObject(buf, t)
     ZigZag.writeInt(buf.position(), out)
@@ -74,10 +72,8 @@ extends SerializationStream {
   def close() { out.close() }
 }
 
-class KryoDeserializationStream(kryo: Kryo, in: InputStream)
+class KryoDeserializationStream(buf: ObjectBuffer, in: InputStream)
 extends DeserializationStream {
-  val buf = new ObjectBuffer(kryo, 1024*1024)
-
   def readObject[T](): T = {
     val len = ZigZag.readInt(in)
     buf.readClassAndObject(in, len).asInstanceOf[T]
@@ -86,8 +82,8 @@ extends DeserializationStream {
   def close() { in.close() }
 }
 
-class KryoSerializer(kryo: Kryo) extends Serializer {
-  val buf = new ObjectBuffer(kryo, 1024*1024)
+class KryoSerializer(strat: KryoSerialization) extends Serializer {
+  val buf = strat.threadBuf.get()
 
   def serialize[T](t: T): Array[Byte] = {
     buf.writeClassAndObject(t)
@@ -98,11 +94,11 @@ class KryoSerializer(kryo: Kryo) extends Serializer {
   }
 
   def outputStream(s: OutputStream): SerializationStream = {
-    new KryoSerializationStream(kryo, s)
+    new KryoSerializationStream(strat.kryo, strat.threadByteBuf.get(), s)
   }
 
   def inputStream(s: InputStream): DeserializationStream = {
-    new KryoDeserializationStream(kryo, s)
+    new KryoDeserializationStream(buf, s)
   }
 }
 
@@ -114,6 +110,14 @@ trait KryoRegistrator {
 class KryoSerialization extends SerializationStrategy with Logging {
   val kryo = createKryo()
 
+  val threadBuf = new ThreadLocal[ObjectBuffer] {
+    override def initialValue = new ObjectBuffer(kryo, 128*1024*1024)
+  }
+
+  val threadByteBuf = new ThreadLocal[ByteBuffer] {
+    override def initialValue = ByteBuffer.allocate(128*1024*1024)
+  }
+
   def createKryo(): Kryo = {
     val kryo = new Kryo()
 
@@ -158,5 +162,5 @@ class KryoSerialization extends SerializationStrategy with Logging {
     kryo
   }
 
-  def newSerializer(): Serializer = new KryoSerializer(kryo)
+  def newSerializer(): Serializer = new KryoSerializer(this)
 }
-- 
GitLab