Skip to content
Snippets Groups Projects
Commit c845611f authored by Reynold Xin's avatar Reynold Xin
Browse files

Moved the Spark internal class registration for Kryo into an object, and added...

Moved the Spark internal class registration for Kryo into an object, and added more classes (e.g. MapStatus, BlockManagerId) to the registration.
parent 7c5f70d8
No related branches found
No related tags found
No related merge requests found
......@@ -27,11 +27,11 @@ import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
import org.apache.spark.{SerializableWritable, Logging}
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.storage.{GetBlock,GotBlock, PutBlock, StorageLevel, TestBlockId}
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
/**
* A Spark serializer that uses the
* [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
*/
class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
......@@ -50,21 +50,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
// Do this before we invoke the user registrator so the user registrator can override this.
kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
val blockId = TestBlockId("1")
// Register some commonly used classes
val toRegister: Seq[AnyRef] = Seq(
ByteBuffer.allocate(1),
StorageLevel.MEMORY_ONLY,
PutBlock(blockId, ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock(blockId, ByteBuffer.allocate(1)),
GetBlock(blockId),
1 to 10,
1 until 10,
1L to 10L,
1L until 10L
)
for (obj <- toRegister) kryo.register(obj.getClass)
for (cls <- KryoSerializer.toRegister) kryo.register(cls)
// Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
......@@ -169,3 +155,21 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
trait KryoRegistrator {
def registerClasses(kryo: Kryo)
}
private[serializer] object KryoSerializer {
// Commonly used classes.
private val toRegister: Seq[Class[_]] = Seq(
ByteBuffer.allocate(1).getClass,
classOf[StorageLevel],
classOf[PutBlock],
classOf[GotBlock],
classOf[GetBlock],
classOf[MapStatus],
classOf[BlockManagerId],
classOf[Array[Byte]],
(1 to 10).getClass,
(1 until 10).getClass,
(1L to 10L).getClass,
(1L until 10L).getClass
)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment