diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 64ba27f34d2f11535eb7f60b62a70ec26c304a43..217957963437d5407b45d84761a9822e68c4abb8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -177,6 +177,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ override def serialize[T: ClassTag](t: T): ByteBuffer = { output.clear() + kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) try { kryo.writeClassAndObject(output, t) } catch { @@ -202,6 +203,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ } override def serializeStream(s: OutputStream): SerializationStream = { + kryo.reset() // We must reset in case this serializer instance was reused (see SPARK-7766) new KryoSerializationStream(kryo, s) } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index c7369de24b81f817374b9e4191e5d5a8bfe34d37..0bd91a8dba2ab3c05faada5f2a7e78867899ba29 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.serializer +import java.io.ByteArrayOutputStream + import scala.collection.mutable import scala.reflect.ClassTag @@ -319,6 +321,37 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { val ser2 = new KryoSerializer(conf).newInstance().asInstanceOf[KryoSerializerInstance] assert(!ser2.getAutoReset) } + + private def testSerializerInstanceReuse(autoReset: Boolean, referenceTracking: Boolean): Unit = { + val conf = new SparkConf(loadDefaults = false) + .set("spark.kryo.referenceTracking", referenceTracking.toString) + if (!autoReset) { + conf.set("spark.kryo.registrator", classOf[RegistratorWithoutAutoReset].getName) + } + val ser = new KryoSerializer(conf) + val serInstance = ser.newInstance().asInstanceOf[KryoSerializerInstance] + assert (serInstance.getAutoReset() === autoReset) + val obj = ("Hello", "World") + def serializeObjects(): Array[Byte] = { + val baos = new ByteArrayOutputStream() + val serStream = serInstance.serializeStream(baos) + serStream.writeObject(obj) + serStream.writeObject(obj) + serStream.close() + baos.toByteArray + } + val output1: Array[Byte] = serializeObjects() + val output2: Array[Byte] = serializeObjects() + assert (output1 === output2) + } + + // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling + // reference-tracking would lead to corrupted output when serializer instances are re-used + for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) { + test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") { + testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking) + } + } }