diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 9ce64d41fbc4006caf468fe55423fe3b17975129..dc7aa99738c17cb04467717dfac90d4209c86f25 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -158,7 +158,13 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ override def serialize[T: ClassTag](t: T): ByteBuffer = { output.clear() - kryo.writeClassAndObject(output, t) + try { + kryo.writeClassAndObject(output, t) + } catch { + case e: KryoException if e.getMessage.startsWith("Buffer overflow") => + throw new SparkException(s"Kryo serialization failed: ${e.getMessage}. To avoid this, " + + "increase spark.kryoserializer.buffer.max.mb value.") + } ByteBuffer.wrap(output.toBytes) } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 523d89820744796c9fa856445cd2758a3a0f91e9..6198df84fab3da05a196eca87d49a7194ab07b91 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -261,6 +261,20 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes)) } } + + test("serialization buffer overflow reporting") { + import org.apache.spark.SparkException + val kryoBufferMaxProperty = "spark.kryoserializer.buffer.max.mb" + + val largeObject = (1 to 1000000).toArray + + val conf = new SparkConf(false) + conf.set(kryoBufferMaxProperty, "1") + + val ser = new KryoSerializer(conf).newInstance() + val thrown = intercept[SparkException](ser.serialize(largeObject)) + assert(thrown.getMessage.contains(kryoBufferMaxProperty)) + } }