Skip to content
Snippets Groups Projects
Commit 664e5fd2 authored by Hiral Patel's avatar Hiral Patel
Browse files

Fix reference bug in Kryo serializer, add test, update version

parent 9f0dc829
No related branches found
No related tags found
No related merge requests found
......@@ -157,27 +157,34 @@ class KryoSerializer extends spark.serializer.Serializer with Logging {
// Register maps with a special serializer since they have complex internal structure
class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any])
extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] {
//hack, look at https://groups.google.com/forum/#!msg/kryo-users/Eu5V4bxCfws/k-8UQ22y59AJ
private final val FAKE_REFERENCE = new Object()
override def write(
kryo: Kryo,
output: KryoOutput,
obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
kryo: Kryo,
output: KryoOutput,
obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) {
val map = obj.asInstanceOf[scala.collection.Map[Any, Any]]
kryo.writeObject(output, map.size.asInstanceOf[java.lang.Integer])
output.writeInt(map.size)
for ((k, v) <- map) {
kryo.writeClassAndObject(output, k)
kryo.writeClassAndObject(output, v)
}
}
override def read (
kryo: Kryo,
input: KryoInput,
cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
kryo: Kryo,
input: KryoInput,
cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]])
: Array[(Any, Any)] => scala.collection.Map[Any, Any] = {
val size = kryo.readObject(input, classOf[java.lang.Integer]).intValue
kryo.reference(FAKE_REFERENCE)
val size = input.readInt()
val elems = new Array[(Any, Any)](size)
for (i <- 0 until size)
elems(i) = (kryo.readClassAndObject(input), kryo.readClassAndObject(input))
for (i <- 0 until size) {
val k = kryo.readClassAndObject(input)
val v = kryo.readClassAndObject(input)
elems(i)=(k,v)
}
buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]]
}
}
......
......@@ -82,6 +82,7 @@ class KryoSerializerSuite extends FunSuite {
check(mutable.HashMap(1 -> "one", 2 -> "two"))
check(mutable.HashMap("one" -> 1, "two" -> 2))
check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4))))
check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
}
test("custom registrator") {
......
......@@ -132,7 +132,7 @@ object SparkBuild extends Build {
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
"asm" % "asm-all" % "3.3.1",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.20",
"de.javakaffee" % "kryo-serializers" % "0.22",
"com.typesafe.akka" % "akka-actor" % "2.0.3",
"com.typesafe.akka" % "akka-remote" % "2.0.3",
"com.typesafe.akka" % "akka-slf4j" % "2.0.3",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment