Skip to content
Snippets Groups Projects
Commit 1f1fccc5 authored by Imran Rashid's avatar Imran Rashid Committed by Reynold Xin
Browse files

[SPARK-5949] HighlyCompressedMapStatus needs more classes registered w/ kryo

https://issues.apache.org/jira/browse/SPARK-5949

Author: Imran Rashid <irashid@cloudera.com>

Closes #4877 from squito/SPARK-5949_register_roaring_bitmap and squashes the following commits:

7e13316 [Imran Rashid] style style style
5f6bb6d [Imran Rashid] more style
709bfe0 [Imran Rashid] style
a5cb744 [Imran Rashid] update tests to cover both types of RoaringBitmapContainers
09610c6 [Imran Rashid] formatting
f9a0b7c [Imran Rashid] put primitive array registrations together
97beaf8 [Imran Rashid] SPARK-5949 HighlyCompressedMapStatus needs more classes registered w/ kryo
parent 6c20f352
No related branches found
No related tags found
No related merge requests found
...@@ -20,22 +20,23 @@ package org.apache.spark.serializer ...@@ -20,22 +20,23 @@ package org.apache.spark.serializer
import java.io.{EOFException, InputStream, OutputStream} import java.io.{EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer import java.nio.ByteBuffer
import scala.reflect.ClassTag
import com.esotericsoftware.kryo.{Kryo, KryoException} import com.esotericsoftware.kryo.{Kryo, KryoException}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap}
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.nio.{PutBlock, GotBlock, GetBlock} import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._ import org.apache.spark.storage._
import org.apache.spark.util.BoundedPriorityQueue import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.collection.CompactBuffer import org.apache.spark.util.collection.CompactBuffer
import scala.reflect.ClassTag
/** /**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
* *
...@@ -202,9 +203,17 @@ private[serializer] object KryoSerializer { ...@@ -202,9 +203,17 @@ private[serializer] object KryoSerializer {
classOf[GetBlock], classOf[GetBlock],
classOf[CompressedMapStatus], classOf[CompressedMapStatus],
classOf[HighlyCompressedMapStatus], classOf[HighlyCompressedMapStatus],
classOf[RoaringBitmap],
classOf[RoaringArray],
classOf[RoaringArray.Element],
classOf[Array[RoaringArray.Element]],
classOf[ArrayContainer],
classOf[BitmapContainer],
classOf[CompactBuffer[_]], classOf[CompactBuffer[_]],
classOf[BlockManagerId], classOf[BlockManagerId],
classOf[Array[Byte]], classOf[Array[Byte]],
classOf[Array[Short]],
classOf[Array[Long]],
classOf[BoundedPriorityQueue[_]], classOf[BoundedPriorityQueue[_]],
classOf[SparkConf] classOf[SparkConf]
) )
......
...@@ -23,9 +23,10 @@ import scala.reflect.ClassTag ...@@ -23,9 +23,10 @@ import scala.reflect.ClassTag
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Kryo
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.apache.spark.{SparkConf, SharedSparkContext} import org.apache.spark.{SharedSparkContext, SparkConf}
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._ import org.apache.spark.serializer.KryoTest._
import org.apache.spark.storage.BlockManagerId
class KryoSerializerSuite extends FunSuite with SharedSparkContext { class KryoSerializerSuite extends FunSuite with SharedSparkContext {
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
...@@ -242,6 +243,24 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { ...@@ -242,6 +243,24 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
ser.newInstance().deserialize[ClassLoaderTestingObject](bytes) ser.newInstance().deserialize[ClassLoaderTestingObject](bytes)
} }
} }
test("registration of HighlyCompressedMapStatus") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")
// these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16
// values, and they use a bitmap (dense) if they have more than 4096 values, and an
// array (sparse) if they use less. So we just create two cases, one sparse and one dense.
// and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly
// empty blocks
val ser = new KryoSerializer(conf).newInstance()
val denseBlockSizes = new Array[Long](5000)
val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L)
Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes =>
ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes))
}
}
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment