Skip to content
Snippets Groups Projects
Commit 58d4f6c8 authored by Matei Zaharia's avatar Matei Zaharia
Browse files

Merge pull request #157 from rxin/kryo

3 Kryo related changes.

1. Call Kryo setReferences before calling user specified Kryo registrator. This is done so the user specified registrator can override the default setting.

2. Register more internal classes (MapStatus, BlockManagerId).

3. Slightly refactored the internal class registration to allocate less memory.
parents 3efc0195 c845611f
No related branches found
No related tags found
No related merge requests found
...@@ -27,13 +27,17 @@ import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar} ...@@ -27,13 +27,17 @@ import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
import org.apache.spark.{SerializableWritable, Logging} import org.apache.spark.{SerializableWritable, Logging}
import org.apache.spark.broadcast.HttpBroadcast import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.storage.{GetBlock,GotBlock, PutBlock, StorageLevel, TestBlockId} import org.apache.spark.scheduler.MapStatus
import org.apache.spark.storage._
/** /**
* A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]]. * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
*/ */
class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging { class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
private val bufferSize = {
System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
}
def newKryoOutput() = new KryoOutput(bufferSize) def newKryoOutput() = new KryoOutput(bufferSize)
...@@ -42,21 +46,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging ...@@ -42,21 +46,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
val kryo = instantiator.newKryo() val kryo = instantiator.newKryo()
val classLoader = Thread.currentThread.getContextClassLoader val classLoader = Thread.currentThread.getContextClassLoader
val blockId = TestBlockId("1") // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
// Register some commonly used classes // Do this before we invoke the user registrator so the user registrator can override this.
val toRegister: Seq[AnyRef] = Seq( kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
ByteBuffer.allocate(1),
StorageLevel.MEMORY_ONLY, for (cls <- KryoSerializer.toRegister) kryo.register(cls)
PutBlock(blockId, ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock(blockId, ByteBuffer.allocate(1)),
GetBlock(blockId),
1 to 10,
1 until 10,
1L to 10L,
1L until 10L
)
for (obj <- toRegister) kryo.register(obj.getClass)
// Allow sending SerializableWritable // Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
...@@ -78,10 +72,6 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging ...@@ -78,10 +72,6 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
new AllScalaRegistrar().apply(kryo) new AllScalaRegistrar().apply(kryo)
kryo.setClassLoader(classLoader) kryo.setClassLoader(classLoader)
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
kryo kryo
} }
...@@ -165,3 +155,21 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ ...@@ -165,3 +155,21 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
trait KryoRegistrator { trait KryoRegistrator {
def registerClasses(kryo: Kryo) def registerClasses(kryo: Kryo)
} }
private[serializer] object KryoSerializer {
// Commonly used classes.
private val toRegister: Seq[Class[_]] = Seq(
ByteBuffer.allocate(1).getClass,
classOf[StorageLevel],
classOf[PutBlock],
classOf[GotBlock],
classOf[GetBlock],
classOf[MapStatus],
classOf[BlockManagerId],
classOf[Array[Byte]],
(1 to 10).getClass,
(1 until 10).getClass,
(1L to 10L).getClass,
(1L until 10L).getClass
)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment