diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 048e0d0186594b198314b4968a68ff345388b7dd..5e45b375ddd452ea130f701afb47c8fc388e4549 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -141,7 +141,7 @@ private[spark] class HighlyCompressedMapStatus private ( private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, - @transient private var hugeBlockSizes: Map[Int, Byte]) + private var hugeBlockSizes: Map[Int, Byte]) extends MapStatus with Externalizable { // loc could be null when the default constructor is called during deserialization diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index e15166d11c243d4975924d3e31cae06116f48b87..4f03e54e304f6f04fd36bbb3f4c847b7c2e79a32 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -175,6 +175,7 @@ class KryoSerializer(conf: SparkConf) kryo.register(None.getClass) kryo.register(Nil.getClass) kryo.register(Utils.classForName("scala.collection.immutable.$colon$colon")) + kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$")) kryo.register(classOf[ArrayBuffer[Any]]) kryo.setClassLoader(classLoader) diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 3ec37f674c77b82f0c58b031bc81ebd7e8ac5d05..e6120139f4958c8e04ad753786e5d4b5f733851a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -24,9 +24,9 @@ import scala.util.Random import org.mockito.Mockito._ import org.roaringbitmap.RoaringBitmap -import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} import org.apache.spark.internal.config -import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} import org.apache.spark.storage.BlockManagerId class MapStatusSuite extends SparkFunSuite { @@ -154,4 +154,18 @@ class MapStatusSuite extends SparkFunSuite { case part => assert(status2.getSizeForBlock(part) >= sizes(part)) } } + + test("SPARK-21133 HighlyCompressedMapStatus#writeExternal throws NPE") { + val conf = new SparkConf() + .set("spark.serializer", classOf[KryoSerializer].getName) + .setMaster("local") + .setAppName("SPARK-21133") + val sc = new SparkContext(conf) + try { + val count = sc.parallelize(0 until 3000, 10).repartition(2001).collect().length + assert(count === 3000) + } finally { + sc.stop() + } + } }