Skip to content
Snippets Groups Projects
Commit 4a3fb06a authored by Reynold Xin's avatar Reynold Xin
Browse files

Updated Kryo to 2.20.

parent 63fe4e9d
No related branches found
No related tags found
No related merge requests found
......@@ -9,153 +9,80 @@ import scala.collection.mutable
import com.esotericsoftware.kryo._
import com.esotericsoftware.kryo.{Serializer => KSerializer}
import com.esotericsoftware.kryo.serialize.ClassSerializer
import com.esotericsoftware.kryo.serialize.SerializableSerializer
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
import spark.broadcast._
import spark.storage._
/**
* Zig-zag encoder used to write object sizes to serialization streams.
* Based on Kryo's integer encoder.
*/
private[spark] object ZigZag {
def writeInt(n: Int, out: OutputStream) {
var value = n
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
out.write(value)
}
private[spark]
class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
def readInt(in: InputStream): Int = {
var offset = 0
var result = 0
while (offset < 32) {
val b = in.read()
if (b == -1) {
throw new EOFException("End of stream")
}
result |= ((b & 0x7F) << offset)
if ((b & 0x80) == 0) {
return result
}
offset += 7
}
throw new SparkException("Malformed zigzag-encoded integer")
}
}
private[spark]
class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream)
extends SerializationStream {
val channel = Channels.newChannel(out)
val output = new KryoOutput(outStream)
def writeObject[T](t: T): SerializationStream = {
kryo.writeClassAndObject(threadBuffer, t)
ZigZag.writeInt(threadBuffer.position(), out)
threadBuffer.flip()
channel.write(threadBuffer)
threadBuffer.clear()
kryo.writeClassAndObject(output, t)
this
}
def flush() { out.flush() }
def close() { out.close() }
def flush() { output.flush() }
def close() { output.close() }
}
private[spark]
class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream)
extends DeserializationStream {
private[spark]
class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
val input = new KryoInput(inStream)
def readObject[T](): T = {
val len = ZigZag.readInt(in)
objectBuffer.readClassAndObject(in, len).asInstanceOf[T]
try {
kryo.readClassAndObject(input).asInstanceOf[T]
} catch {
// DeserializationStream uses the EOF exception to indicate stopping condition.
case e: com.esotericsoftware.kryo.KryoException => throw new java.io.EOFException
}
}
def close() { in.close() }
def close() {
input.close()
inStream.close()
}
}
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
val kryo = ks.kryo
val threadBuffer = ks.threadBuffer.get()
val objectBuffer = ks.objectBuffer.get()
val kryo = ks.kryo.get()
val output = ks.output.get()
val input = ks.input.get()
def serialize[T](t: T): ByteBuffer = {
// Write it to our thread-local scratch buffer first to figure out the size, then return a new
// ByteBuffer of the appropriate size
threadBuffer.clear()
kryo.writeClassAndObject(threadBuffer, t)
val newBuf = ByteBuffer.allocate(threadBuffer.position)
threadBuffer.flip()
newBuf.put(threadBuffer)
newBuf.flip()
newBuf
output.clear()
kryo.writeClassAndObject(output, t)
ByteBuffer.wrap(output.toBytes)
}
def deserialize[T](bytes: ByteBuffer): T = {
kryo.readClassAndObject(bytes).asInstanceOf[T]
input.setBuffer(bytes.array)
kryo.readClassAndObject(input).asInstanceOf[T]
}
def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
val oldClassLoader = kryo.getClassLoader
kryo.setClassLoader(loader)
val obj = kryo.readClassAndObject(bytes).asInstanceOf[T]
input.setBuffer(bytes.array)
val obj = kryo.readClassAndObject(input).asInstanceOf[T]
kryo.setClassLoader(oldClassLoader)
obj
}
def serializeStream(s: OutputStream): SerializationStream = {
threadBuffer.clear()
new KryoSerializationStream(kryo, threadBuffer, s)
new KryoSerializationStream(kryo, s)
}
def deserializeStream(s: InputStream): DeserializationStream = {
new KryoDeserializationStream(objectBuffer, s)
}
override def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
threadBuffer.clear()
while (iterator.hasNext) {
val element = iterator.next()
// TODO: Do we also want to write the object's size? Doesn't seem necessary.
kryo.writeClassAndObject(threadBuffer, element)
}
val newBuf = ByteBuffer.allocate(threadBuffer.position)
threadBuffer.flip()
newBuf.put(threadBuffer)
newBuf.flip()
newBuf
}
override def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
buffer.rewind()
new Iterator[Any] {
override def hasNext: Boolean = buffer.remaining > 0
override def next(): Any = kryo.readClassAndObject(buffer)
}
new KryoDeserializationStream(kryo, s)
}
}
......@@ -171,18 +98,19 @@ trait KryoRegistrator {
* A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
*/
class KryoSerializer extends spark.serializer.Serializer with Logging {
// Make this lazy so that it only gets called once we receive our first task on each executor,
// so we can pull out any custom Kryo registrator from the user's JARs.
lazy val kryo = createKryo()
val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "32").toInt * 1024 * 1024
val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
val objectBuffer = new ThreadLocal[ObjectBuffer] {
override def initialValue = new ObjectBuffer(kryo, bufferSize)
val kryo = new ThreadLocal[Kryo] {
override def initialValue = createKryo()
}
val threadBuffer = new ThreadLocal[ByteBuffer] {
override def initialValue = ByteBuffer.allocate(bufferSize)
val output = new ThreadLocal[KryoOutput] {
override def initialValue = new KryoOutput(bufferSize)
}
val input = new ThreadLocal[KryoInput] {
override def initialValue = new KryoInput(bufferSize)
}
def createKryo(): Kryo = {
......@@ -213,41 +141,44 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
kryo.register(obj.getClass)
}
// Register the following classes for passing closures.
kryo.register(classOf[Class[_]], new ClassSerializer(kryo))
kryo.setRegistrationOptional(true)
// Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new SerializableSerializer())
kryo.register(classOf[HttpBroadcast[_]], new SerializableSerializer())
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
// Register some commonly used Scala singleton objects. Because these
// are singletons, we must return the exact same local object when we
// deserialize rather than returning a clone as FieldSerializer would.
class SingletonSerializer(obj: AnyRef) extends KSerializer {
override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {}
override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = obj.asInstanceOf[T]
class SingletonSerializer[T](obj: T) extends KSerializer[T] {
override def write(kryo: Kryo, output: KryoOutput, obj: T) {}
override def read(kryo: Kryo, input: KryoInput, cls: java.lang.Class[T]): T = obj
}
kryo.register(None.getClass, new SingletonSerializer(None))
kryo.register(Nil.getClass, new SingletonSerializer(Nil))
kryo.register(None.getClass, new SingletonSerializer[AnyRef](None))
kryo.register(Nil.getClass, new SingletonSerializer[AnyRef](Nil))
// Register maps with a special serializer since they have complex internal structure
class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any])
extends KSerializer {
override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {
extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
override def write(
kryo: Kryo,
output: KryoOutput,
obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
val map = obj.asInstanceOf[scala.collection.Map[Any, Any]]
kryo.writeObject(buf, map.size.asInstanceOf[java.lang.Integer])
kryo.writeObject(output, map.size.asInstanceOf[java.lang.Integer])
for ((k, v) <- map) {
kryo.writeClassAndObject(buf, k)
kryo.writeClassAndObject(buf, v)
kryo.writeClassAndObject(output, k)
kryo.writeClassAndObject(output, v)
}
}
override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = {
val size = kryo.readObject(buf, classOf[java.lang.Integer]).intValue
override def read (
kryo: Kryo,
input: KryoInput,
cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
: Array[(Any, Any)] => scala.collection.Map[Any, Any] = {
val size = kryo.readObject(input, classOf[java.lang.Integer]).intValue
val elems = new Array[(Any, Any)](size)
for (i <- 0 until size)
elems(i) = (kryo.readClassAndObject(buf), kryo.readClassAndObject(buf))
buildMap(elems).asInstanceOf[T]
elems(i) = (kryo.readClassAndObject(input), kryo.readClassAndObject(input))
buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]]
}
}
kryo.register(mutable.HashMap().getClass, new ScalaMapSerializer(mutable.HashMap() ++ _))
......
......@@ -120,7 +120,7 @@ object SparkBuild extends Build {
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
"asm" % "asm-all" % "3.3.1",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.9",
"de.javakaffee" % "kryo-serializers" % "0.20",
"com.typesafe.akka" % "akka-actor" % "2.0.3",
"com.typesafe.akka" % "akka-remote" % "2.0.3",
"com.typesafe.akka" % "akka-slf4j" % "2.0.3",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment