diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index 44b630e4788563e0b628b6b59382cdecb1c6f325..f24196ea49e04b3b74db9ad15ddaa12aba273843 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -9,153 +9,80 @@ import scala.collection.mutable import com.esotericsoftware.kryo._ import com.esotericsoftware.kryo.{Serializer => KSerializer} -import com.esotericsoftware.kryo.serialize.ClassSerializer -import com.esotericsoftware.kryo.serialize.SerializableSerializer +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import de.javakaffee.kryoserializers.KryoReflectionFactorySupport import serializer.{SerializerInstance, DeserializationStream, SerializationStream} import spark.broadcast._ import spark.storage._ -/** - * Zig-zag encoder used to write object sizes to serialization streams. - * Based on Kryo's integer encoder. - */ -private[spark] object ZigZag { - def writeInt(n: Int, out: OutputStream) { - var value = n - if ((value & ~0x7F) == 0) { - out.write(value) - return - } - out.write(((value & 0x7F) | 0x80)) - value >>>= 7 - if ((value & ~0x7F) == 0) { - out.write(value) - return - } - out.write(((value & 0x7F) | 0x80)) - value >>>= 7 - if ((value & ~0x7F) == 0) { - out.write(value) - return - } - out.write(((value & 0x7F) | 0x80)) - value >>>= 7 - if ((value & ~0x7F) == 0) { - out.write(value) - return - } - out.write(((value & 0x7F) | 0x80)) - value >>>= 7 - out.write(value) - } +private[spark] +class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream { - def readInt(in: InputStream): Int = { - var offset = 0 - var result = 0 - while (offset < 32) { - val b = in.read() - if (b == -1) { - throw new EOFException("End of stream") - } - result |= ((b & 0x7F) << offset) - if ((b & 0x80) == 0) { - return result - } - offset += 7 - } - throw new SparkException("Malformed zigzag-encoded integer") - } -} - -private[spark] -class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream) -extends SerializationStream { - val channel = Channels.newChannel(out) + val output = new KryoOutput(outStream) def writeObject[T](t: T): SerializationStream = { - kryo.writeClassAndObject(threadBuffer, t) - ZigZag.writeInt(threadBuffer.position(), out) - threadBuffer.flip() - channel.write(threadBuffer) - threadBuffer.clear() + kryo.writeClassAndObject(output, t) this } - def flush() { out.flush() } - def close() { out.close() } + def flush() { output.flush() } + def close() { output.close() } } -private[spark] -class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream) -extends DeserializationStream { +private[spark] +class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream { + + val input = new KryoInput(inStream) + def readObject[T](): T = { - val len = ZigZag.readInt(in) - objectBuffer.readClassAndObject(in, len).asInstanceOf[T] + try { + kryo.readClassAndObject(input).asInstanceOf[T] + } catch { + // DeserializationStream uses the EOF exception to indicate stopping condition. + case e: com.esotericsoftware.kryo.KryoException => throw new java.io.EOFException + } } - def close() { in.close() } + def close() { + input.close() + inStream.close() + } } private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { - val kryo = ks.kryo - val threadBuffer = ks.threadBuffer.get() - val objectBuffer = ks.objectBuffer.get() + + val kryo = ks.kryo.get() + val output = ks.output.get() + val input = ks.input.get() def serialize[T](t: T): ByteBuffer = { - // Write it to our thread-local scratch buffer first to figure out the size, then return a new - // ByteBuffer of the appropriate size - threadBuffer.clear() - kryo.writeClassAndObject(threadBuffer, t) - val newBuf = ByteBuffer.allocate(threadBuffer.position) - threadBuffer.flip() - newBuf.put(threadBuffer) - newBuf.flip() - newBuf + output.clear() + kryo.writeClassAndObject(output, t) + ByteBuffer.wrap(output.toBytes) } def deserialize[T](bytes: ByteBuffer): T = { - kryo.readClassAndObject(bytes).asInstanceOf[T] + input.setBuffer(bytes.array) + kryo.readClassAndObject(input).asInstanceOf[T] } def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { val oldClassLoader = kryo.getClassLoader kryo.setClassLoader(loader) - val obj = kryo.readClassAndObject(bytes).asInstanceOf[T] + input.setBuffer(bytes.array) + val obj = kryo.readClassAndObject(input).asInstanceOf[T] kryo.setClassLoader(oldClassLoader) obj } def serializeStream(s: OutputStream): SerializationStream = { - threadBuffer.clear() - new KryoSerializationStream(kryo, threadBuffer, s) + new KryoSerializationStream(kryo, s) } def deserializeStream(s: InputStream): DeserializationStream = { - new KryoDeserializationStream(objectBuffer, s) - } - - override def serializeMany[T](iterator: Iterator[T]): ByteBuffer = { - threadBuffer.clear() - while (iterator.hasNext) { - val element = iterator.next() - // TODO: Do we also want to write the object's size? Doesn't seem necessary. - kryo.writeClassAndObject(threadBuffer, element) - } - val newBuf = ByteBuffer.allocate(threadBuffer.position) - threadBuffer.flip() - newBuf.put(threadBuffer) - newBuf.flip() - newBuf - } - - override def deserializeMany(buffer: ByteBuffer): Iterator[Any] = { - buffer.rewind() - new Iterator[Any] { - override def hasNext: Boolean = buffer.remaining > 0 - override def next(): Any = kryo.readClassAndObject(buffer) - } + new KryoDeserializationStream(kryo, s) } } @@ -171,18 +98,19 @@ trait KryoRegistrator { * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]]. */ class KryoSerializer extends spark.serializer.Serializer with Logging { - // Make this lazy so that it only gets called once we receive our first task on each executor, - // so we can pull out any custom Kryo registrator from the user's JARs. - lazy val kryo = createKryo() - val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "32").toInt * 1024 * 1024 + val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024 - val objectBuffer = new ThreadLocal[ObjectBuffer] { - override def initialValue = new ObjectBuffer(kryo, bufferSize) + val kryo = new ThreadLocal[Kryo] { + override def initialValue = createKryo() } - val threadBuffer = new ThreadLocal[ByteBuffer] { - override def initialValue = ByteBuffer.allocate(bufferSize) + val output = new ThreadLocal[KryoOutput] { + override def initialValue = new KryoOutput(bufferSize) + } + + val input = new ThreadLocal[KryoInput] { + override def initialValue = new KryoInput(bufferSize) } def createKryo(): Kryo = { @@ -213,41 +141,44 @@ class KryoSerializer extends spark.serializer.Serializer with Logging { kryo.register(obj.getClass) } - // Register the following classes for passing closures. - kryo.register(classOf[Class[_]], new ClassSerializer(kryo)) - kryo.setRegistrationOptional(true) - // Allow sending SerializableWritable - kryo.register(classOf[SerializableWritable[_]], new SerializableSerializer()) - kryo.register(classOf[HttpBroadcast[_]], new SerializableSerializer()) + kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) + kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) // Register some commonly used Scala singleton objects. Because these // are singletons, we must return the exact same local object when we // deserialize rather than returning a clone as FieldSerializer would. - class SingletonSerializer(obj: AnyRef) extends KSerializer { - override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {} - override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = obj.asInstanceOf[T] + class SingletonSerializer[T](obj: T) extends KSerializer[T] { + override def write(kryo: Kryo, output: KryoOutput, obj: T) {} + override def read(kryo: Kryo, input: KryoInput, cls: java.lang.Class[T]): T = obj } - kryo.register(None.getClass, new SingletonSerializer(None)) - kryo.register(Nil.getClass, new SingletonSerializer(Nil)) + kryo.register(None.getClass, new SingletonSerializer[AnyRef](None)) + kryo.register(Nil.getClass, new SingletonSerializer[AnyRef](Nil)) // Register maps with a special serializer since they have complex internal structure class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any]) - extends KSerializer { - override def writeObjectData(buf: ByteBuffer, obj: AnyRef) { + extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] { + override def write( + kryo: Kryo, + output: KryoOutput, + obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) { val map = obj.asInstanceOf[scala.collection.Map[Any, Any]] - kryo.writeObject(buf, map.size.asInstanceOf[java.lang.Integer]) + kryo.writeObject(output, map.size.asInstanceOf[java.lang.Integer]) for ((k, v) <- map) { - kryo.writeClassAndObject(buf, k) - kryo.writeClassAndObject(buf, v) + kryo.writeClassAndObject(output, k) + kryo.writeClassAndObject(output, v) } } - override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = { - val size = kryo.readObject(buf, classOf[java.lang.Integer]).intValue + override def read ( + kryo: Kryo, + input: KryoInput, + cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]]) + : Array[(Any, Any)] => scala.collection.Map[Any, Any] = { + val size = kryo.readObject(input, classOf[java.lang.Integer]).intValue val elems = new Array[(Any, Any)](size) for (i <- 0 until size) - elems(i) = (kryo.readClassAndObject(buf), kryo.readClassAndObject(buf)) - buildMap(elems).asInstanceOf[T] + elems(i) = (kryo.readClassAndObject(input), kryo.readClassAndObject(input)) + buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]] } } kryo.register(mutable.HashMap().getClass, new ScalaMapSerializer(mutable.HashMap() ++ _)) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 34b93fb694478690f695099f936b4414ad386888..842d0fa96bd034e14e5fc5fed88c124e97eda0b8 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -125,7 +125,7 @@ object SparkBuild extends Build { "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", - "de.javakaffee" % "kryo-serializers" % "0.9", + "de.javakaffee" % "kryo-serializers" % "0.20", "com.typesafe.akka" % "akka-actor" % "2.0.3", "com.typesafe.akka" % "akka-remote" % "2.0.3", "com.typesafe.akka" % "akka-slf4j" % "2.0.3",