From 7ff2c754f340ba4c4077b0ff6285876eb7871c7b Mon Sep 17 00:00:00 2001 From: Reynold Xin <rxin@apache.org> Date: Wed, 25 Jun 2014 12:43:22 -0700 Subject: [PATCH] [SPARK-2270] Kryo cannot serialize results returned by asJavaIterable and thus groupBy/cogroup are broken in Java APIs when Kryo is used). @pwendell this should be merged into 1.0.1. Thanks @sorenmacbeth for reporting this & helping out with the fix. Author: Reynold Xin <rxin@apache.org> Closes #1206 from rxin/kryo-iterable-2270 and squashes the following commits: 09da0aa [Reynold Xin] Updated the comment. 009bf64 [Reynold Xin] [SPARK-2270] Kryo cannot serialize results returned by asJavaIterable (and thus groupBy/cogroup are broken in Java APIs when Kryo is used). --- .../spark/serializer/KryoSerializer.scala | 50 +++++++++++++++++++ .../serializer/KryoSerializerSuite.scala | 15 ++++++ 2 files changed, 65 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 5286f7b4c2..82b62aaf61 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -64,6 +64,9 @@ class KryoSerializer(conf: SparkConf) kryo.register(cls) } + // For results returned by asJavaIterable. See JavaIterableWrapperSerializer. + kryo.register(JavaIterableWrapperSerializer.wrapperClass, new JavaIterableWrapperSerializer) + // Allow sending SerializableWritable kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) @@ -183,3 +186,50 @@ private[serializer] object KryoSerializer { classOf[Array[Byte]] ) } + +/** + * A Kryo serializer for serializing results returned by asJavaIterable. + * + * The underlying object is scala.collection.convert.Wrappers$IterableWrapper. + * Kryo deserializes this into an AbstractCollection, which unfortunately doesn't work. + */ +private class JavaIterableWrapperSerializer + extends com.esotericsoftware.kryo.Serializer[java.lang.Iterable[_]] { + + import JavaIterableWrapperSerializer._ + + override def write(kryo: Kryo, out: KryoOutput, obj: java.lang.Iterable[_]): Unit = { + // If the object is the wrapper, simply serialize the underlying Scala Iterable object. + // Otherwise, serialize the object itself. + if (obj.getClass == wrapperClass && underlyingMethodOpt.isDefined) { + kryo.writeClassAndObject(out, underlyingMethodOpt.get.invoke(obj)) + } else { + kryo.writeClassAndObject(out, obj) + } + } + + override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]]) + : java.lang.Iterable[_] = { + kryo.readClassAndObject(in) match { + case scalaIterable: Iterable[_] => + scala.collection.JavaConversions.asJavaIterable(scalaIterable) + case javaIterable: java.lang.Iterable[_] => + javaIterable + } + } +} + +private object JavaIterableWrapperSerializer extends Logging { + // The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper). + val wrapperClass = + scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass + + // Get the underlying method so we can use it to get the Scala collection for serialization. + private val underlyingMethodOpt = { + try Some(wrapperClass.getDeclaredMethod("underlying")) catch { + case e: Exception => + logError("Failed to find the underlying field in " + wrapperClass, e) + None + } + } +} diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index cdd6b3d8fe..79280d1a06 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -128,6 +128,21 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { check(1.0 until 1000000.0 by 2.0) } + test("asJavaIterable") { + // Serialize a collection wrapped by asJavaIterable + val ser = new KryoSerializer(conf).newInstance() + val a = ser.serialize(scala.collection.convert.WrapAsJava.asJavaIterable(Seq(12345))) + val b = ser.deserialize[java.lang.Iterable[Int]](a) + assert(b.iterator().next() === 12345) + + // Serialize a normal Java collection + val col = new java.util.ArrayList[Int] + col.add(54321) + val c = ser.serialize(col) + val d = ser.deserialize[java.lang.Iterable[Int]](c) + assert(b.iterator().next() === 12345) + } + test("custom registrator") { val ser = new KryoSerializer(conf).newInstance() def check[T: ClassTag](t: T) { -- GitLab