Skip to content
Snippets Groups Projects
Commit 3334b7c6 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #341 from rxin/4a3fb06a

Kryo2 update against Spark master
parents 5e51b889 4a3fb06a
No related branches found
No related tags found
No related merge requests found
...@@ -9,153 +9,80 @@ import scala.collection.mutable ...@@ -9,153 +9,80 @@ import scala.collection.mutable
import com.esotericsoftware.kryo._ import com.esotericsoftware.kryo._
import com.esotericsoftware.kryo.{Serializer => KSerializer} import com.esotericsoftware.kryo.{Serializer => KSerializer}
import com.esotericsoftware.kryo.serialize.ClassSerializer import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serialize.SerializableSerializer import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
import serializer.{SerializerInstance, DeserializationStream, SerializationStream} import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
import spark.broadcast._ import spark.broadcast._
import spark.storage._ import spark.storage._
/** private[spark]
* Zig-zag encoder used to write object sizes to serialization streams. class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
* Based on Kryo's integer encoder.
*/
private[spark] object ZigZag {
def writeInt(n: Int, out: OutputStream) {
var value = n
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
out.write(value)
}
def readInt(in: InputStream): Int = { val output = new KryoOutput(outStream)
var offset = 0
var result = 0
while (offset < 32) {
val b = in.read()
if (b == -1) {
throw new EOFException("End of stream")
}
result |= ((b & 0x7F) << offset)
if ((b & 0x80) == 0) {
return result
}
offset += 7
}
throw new SparkException("Malformed zigzag-encoded integer")
}
}
private[spark]
class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream)
extends SerializationStream {
val channel = Channels.newChannel(out)
def writeObject[T](t: T): SerializationStream = { def writeObject[T](t: T): SerializationStream = {
kryo.writeClassAndObject(threadBuffer, t) kryo.writeClassAndObject(output, t)
ZigZag.writeInt(threadBuffer.position(), out)
threadBuffer.flip()
channel.write(threadBuffer)
threadBuffer.clear()
this this
} }
def flush() { out.flush() } def flush() { output.flush() }
def close() { out.close() } def close() { output.close() }
} }
private[spark] private[spark]
class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream) class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
extends DeserializationStream {
val input = new KryoInput(inStream)
def readObject[T](): T = { def readObject[T](): T = {
val len = ZigZag.readInt(in) try {
objectBuffer.readClassAndObject(in, len).asInstanceOf[T] kryo.readClassAndObject(input).asInstanceOf[T]
} catch {
// DeserializationStream uses the EOF exception to indicate stopping condition.
case e: com.esotericsoftware.kryo.KryoException => throw new java.io.EOFException
}
} }
def close() { in.close() } def close() {
input.close()
inStream.close()
}
} }
private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
val kryo = ks.kryo
val threadBuffer = ks.threadBuffer.get() val kryo = ks.kryo.get()
val objectBuffer = ks.objectBuffer.get() val output = ks.output.get()
val input = ks.input.get()
def serialize[T](t: T): ByteBuffer = { def serialize[T](t: T): ByteBuffer = {
// Write it to our thread-local scratch buffer first to figure out the size, then return a new output.clear()
// ByteBuffer of the appropriate size kryo.writeClassAndObject(output, t)
threadBuffer.clear() ByteBuffer.wrap(output.toBytes)
kryo.writeClassAndObject(threadBuffer, t)
val newBuf = ByteBuffer.allocate(threadBuffer.position)
threadBuffer.flip()
newBuf.put(threadBuffer)
newBuf.flip()
newBuf
} }
def deserialize[T](bytes: ByteBuffer): T = { def deserialize[T](bytes: ByteBuffer): T = {
kryo.readClassAndObject(bytes).asInstanceOf[T] input.setBuffer(bytes.array)
kryo.readClassAndObject(input).asInstanceOf[T]
} }
def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
val oldClassLoader = kryo.getClassLoader val oldClassLoader = kryo.getClassLoader
kryo.setClassLoader(loader) kryo.setClassLoader(loader)
val obj = kryo.readClassAndObject(bytes).asInstanceOf[T] input.setBuffer(bytes.array)
val obj = kryo.readClassAndObject(input).asInstanceOf[T]
kryo.setClassLoader(oldClassLoader) kryo.setClassLoader(oldClassLoader)
obj obj
} }
def serializeStream(s: OutputStream): SerializationStream = { def serializeStream(s: OutputStream): SerializationStream = {
threadBuffer.clear() new KryoSerializationStream(kryo, s)
new KryoSerializationStream(kryo, threadBuffer, s)
} }
def deserializeStream(s: InputStream): DeserializationStream = { def deserializeStream(s: InputStream): DeserializationStream = {
new KryoDeserializationStream(objectBuffer, s) new KryoDeserializationStream(kryo, s)
}
override def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
threadBuffer.clear()
while (iterator.hasNext) {
val element = iterator.next()
// TODO: Do we also want to write the object's size? Doesn't seem necessary.
kryo.writeClassAndObject(threadBuffer, element)
}
val newBuf = ByteBuffer.allocate(threadBuffer.position)
threadBuffer.flip()
newBuf.put(threadBuffer)
newBuf.flip()
newBuf
}
override def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
buffer.rewind()
new Iterator[Any] {
override def hasNext: Boolean = buffer.remaining > 0
override def next(): Any = kryo.readClassAndObject(buffer)
}
} }
} }
...@@ -171,18 +98,19 @@ trait KryoRegistrator { ...@@ -171,18 +98,19 @@ trait KryoRegistrator {
* A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]]. * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
*/ */
class KryoSerializer extends spark.serializer.Serializer with Logging { class KryoSerializer extends spark.serializer.Serializer with Logging {
// Make this lazy so that it only gets called once we receive our first task on each executor,
// so we can pull out any custom Kryo registrator from the user's JARs.
lazy val kryo = createKryo()
val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "32").toInt * 1024 * 1024 val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
val objectBuffer = new ThreadLocal[ObjectBuffer] { val kryo = new ThreadLocal[Kryo] {
override def initialValue = new ObjectBuffer(kryo, bufferSize) override def initialValue = createKryo()
} }
val threadBuffer = new ThreadLocal[ByteBuffer] { val output = new ThreadLocal[KryoOutput] {
override def initialValue = ByteBuffer.allocate(bufferSize) override def initialValue = new KryoOutput(bufferSize)
}
val input = new ThreadLocal[KryoInput] {
override def initialValue = new KryoInput(bufferSize)
} }
def createKryo(): Kryo = { def createKryo(): Kryo = {
...@@ -213,41 +141,44 @@ class KryoSerializer extends spark.serializer.Serializer with Logging { ...@@ -213,41 +141,44 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
kryo.register(obj.getClass) kryo.register(obj.getClass)
} }
// Register the following classes for passing closures.
kryo.register(classOf[Class[_]], new ClassSerializer(kryo))
kryo.setRegistrationOptional(true)
// Allow sending SerializableWritable // Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new SerializableSerializer()) kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new SerializableSerializer()) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
// Register some commonly used Scala singleton objects. Because these // Register some commonly used Scala singleton objects. Because these
// are singletons, we must return the exact same local object when we // are singletons, we must return the exact same local object when we
// deserialize rather than returning a clone as FieldSerializer would. // deserialize rather than returning a clone as FieldSerializer would.
class SingletonSerializer(obj: AnyRef) extends KSerializer { class SingletonSerializer[T](obj: T) extends KSerializer[T] {
override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {} override def write(kryo: Kryo, output: KryoOutput, obj: T) {}
override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = obj.asInstanceOf[T] override def read(kryo: Kryo, input: KryoInput, cls: java.lang.Class[T]): T = obj
} }
kryo.register(None.getClass, new SingletonSerializer(None)) kryo.register(None.getClass, new SingletonSerializer[AnyRef](None))
kryo.register(Nil.getClass, new SingletonSerializer(Nil)) kryo.register(Nil.getClass, new SingletonSerializer[AnyRef](Nil))
// Register maps with a special serializer since they have complex internal structure // Register maps with a special serializer since they have complex internal structure
class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any]) class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any])
extends KSerializer { extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
override def writeObjectData(buf: ByteBuffer, obj: AnyRef) { override def write(
kryo: Kryo,
output: KryoOutput,
obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
val map = obj.asInstanceOf[scala.collection.Map[Any, Any]] val map = obj.asInstanceOf[scala.collection.Map[Any, Any]]
kryo.writeObject(buf, map.size.asInstanceOf[java.lang.Integer]) kryo.writeObject(output, map.size.asInstanceOf[java.lang.Integer])
for ((k, v) <- map) { for ((k, v) <- map) {
kryo.writeClassAndObject(buf, k) kryo.writeClassAndObject(output, k)
kryo.writeClassAndObject(buf, v) kryo.writeClassAndObject(output, v)
} }
} }
override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = { override def read (
val size = kryo.readObject(buf, classOf[java.lang.Integer]).intValue kryo: Kryo,
input: KryoInput,
cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
: Array[(Any, Any)] => scala.collection.Map[Any, Any] = {
val size = kryo.readObject(input, classOf[java.lang.Integer]).intValue
val elems = new Array[(Any, Any)](size) val elems = new Array[(Any, Any)](size)
for (i <- 0 until size) for (i <- 0 until size)
elems(i) = (kryo.readClassAndObject(buf), kryo.readClassAndObject(buf)) elems(i) = (kryo.readClassAndObject(input), kryo.readClassAndObject(input))
buildMap(elems).asInstanceOf[T] buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]]
} }
} }
kryo.register(mutable.HashMap().getClass, new ScalaMapSerializer(mutable.HashMap() ++ _)) kryo.register(mutable.HashMap().getClass, new ScalaMapSerializer(mutable.HashMap() ++ _))
......
...@@ -125,7 +125,7 @@ object SparkBuild extends Build { ...@@ -125,7 +125,7 @@ object SparkBuild extends Build {
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
"asm" % "asm-all" % "3.3.1", "asm" % "asm-all" % "3.3.1",
"com.google.protobuf" % "protobuf-java" % "2.4.1", "com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.9", "de.javakaffee" % "kryo-serializers" % "0.20",
"com.typesafe.akka" % "akka-actor" % "2.0.3", "com.typesafe.akka" % "akka-actor" % "2.0.3",
"com.typesafe.akka" % "akka-remote" % "2.0.3", "com.typesafe.akka" % "akka-remote" % "2.0.3",
"com.typesafe.akka" % "akka-slf4j" % "2.0.3", "com.typesafe.akka" % "akka-slf4j" % "2.0.3",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment