Skip to content
Snippets Groups Projects
Commit 968f75f6 authored by Reynold Xin's avatar Reynold Xin
Browse files

Added an option (spark.closure.serializer) to specify the serializer for

closures. This enables using Kryo as the closure serializer.
parent 8c95a854
No related branches found
No related tags found
No related merge requests found
...@@ -126,6 +126,10 @@ class WPRSerializerInstance extends SerializerInstance { ...@@ -126,6 +126,10 @@ class WPRSerializerInstance extends SerializerInstance {
throw new UnsupportedOperationException() throw new UnsupportedOperationException()
} }
def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
throw new UnsupportedOperationException()
}
def outputStream(s: OutputStream): SerializationStream = { def outputStream(s: OutputStream): SerializationStream = {
new WPRSerializationStream(s) new WPRSerializationStream(s)
} }
......
...@@ -34,6 +34,15 @@ class JavaSerializerInstance extends SerializerInstance { ...@@ -34,6 +34,15 @@ class JavaSerializerInstance extends SerializerInstance {
in.readObject().asInstanceOf[T] in.readObject().asInstanceOf[T]
} }
def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
val bis = new ByteArrayInputStream(bytes)
val ois = new ObjectInputStream(bis) {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, loader)
}
return ois.readObject.asInstanceOf[T]
}
def outputStream(s: OutputStream): SerializationStream = { def outputStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s) new JavaSerializationStream(s)
} }
......
...@@ -9,6 +9,7 @@ import scala.collection.mutable ...@@ -9,6 +9,7 @@ import scala.collection.mutable
import com.esotericsoftware.kryo._ import com.esotericsoftware.kryo._
import com.esotericsoftware.kryo.{Serializer => KSerializer} import com.esotericsoftware.kryo.{Serializer => KSerializer}
import com.esotericsoftware.kryo.serialize.ClassSerializer
import de.javakaffee.kryoserializers.KryoReflectionFactorySupport import de.javakaffee.kryoserializers.KryoReflectionFactorySupport
/** /**
...@@ -100,6 +101,14 @@ class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { ...@@ -100,6 +101,14 @@ class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
buf.readClassAndObject(bytes).asInstanceOf[T] buf.readClassAndObject(bytes).asInstanceOf[T]
} }
def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
val oldClassLoader = ks.kryo.getClassLoader
ks.kryo.setClassLoader(loader)
val obj = buf.readClassAndObject(bytes).asInstanceOf[T]
ks.kryo.setClassLoader(oldClassLoader)
obj
}
def outputStream(s: OutputStream): SerializationStream = { def outputStream(s: OutputStream): SerializationStream = {
new KryoSerializationStream(ks.kryo, ks.threadByteBuf.get(), s) new KryoSerializationStream(ks.kryo, ks.threadByteBuf.get(), s)
} }
...@@ -129,6 +138,8 @@ class KryoSerializer extends Serializer with Logging { ...@@ -129,6 +138,8 @@ class KryoSerializer extends Serializer with Logging {
} }
def createKryo(): Kryo = { def createKryo(): Kryo = {
// This is used so we can serialize/deserialize objects without a zero-arg
// constructor.
val kryo = new KryoReflectionFactorySupport() val kryo = new KryoReflectionFactorySupport()
// Register some commonly used classes // Register some commonly used classes
...@@ -150,6 +161,10 @@ class KryoSerializer extends Serializer with Logging { ...@@ -150,6 +161,10 @@ class KryoSerializer extends Serializer with Logging {
kryo.register(obj.getClass) kryo.register(obj.getClass)
} }
// Register the following classes for passing closures.
kryo.register(classOf[Class[_]], new ClassSerializer(kryo))
kryo.setRegistrationOptional(true)
// Register some commonly used Scala singleton objects. Because these // Register some commonly used Scala singleton objects. Because these
// are singletons, we must return the exact same local object when we // are singletons, we must return the exact same local object when we
// deserialize rather than returning a clone as FieldSerializer would. // deserialize rather than returning a clone as FieldSerializer would.
......
...@@ -16,6 +16,7 @@ trait Serializer { ...@@ -16,6 +16,7 @@ trait Serializer {
trait SerializerInstance { trait SerializerInstance {
def serialize[T](t: T): Array[Byte] def serialize[T](t: T): Array[Byte]
def deserialize[T](bytes: Array[Byte]): T def deserialize[T](bytes: Array[Byte]): T
def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T
def outputStream(s: OutputStream): SerializationStream def outputStream(s: OutputStream): SerializationStream
def inputStream(s: InputStream): DeserializationStream def inputStream(s: InputStream): DeserializationStream
} }
......
...@@ -12,27 +12,17 @@ import scala.util.Random ...@@ -12,27 +12,17 @@ import scala.util.Random
* Various utility methods used by Spark. * Various utility methods used by Spark.
*/ */
object Utils { object Utils {
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(o)
oos.close
return bos.toByteArray
}
def deserialize[T](bytes: Array[Byte]): T = { // The serializer in this object is used by Spark to serialize closures.
val bis = new ByteArrayInputStream(bytes) val serializerClass = System.getProperty("spark.closure.serializer", "spark.JavaSerializer")
val ois = new ObjectInputStream(bis) val ser = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer]
return ois.readObject.asInstanceOf[T]
} def serialize[T](o: T): Array[Byte] = ser.newInstance().serialize[T](o)
def deserialize[T](bytes: Array[Byte]): T = ser.newInstance().deserialize[T](bytes)
def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = { def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
val bis = new ByteArrayInputStream(bytes) ser.newInstance().deserialize[T](bytes, loader)
val ois = new ObjectInputStream(bis) {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, loader)
}
return ois.readObject.asInstanceOf[T]
} }
def isAlpha(c: Char): Boolean = { def isAlpha(c: Char): Boolean = {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment