Skip to content
Snippets Groups Projects
Commit 7c5f70d8 authored by Reynold Xin's avatar Reynold Xin
Browse files

Call Kryo setReferences before calling user specified Kryo registrator.

parent 3efc0195
No related branches found
No related tags found
No related merge requests found
...@@ -30,10 +30,14 @@ import org.apache.spark.broadcast.HttpBroadcast ...@@ -30,10 +30,14 @@ import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.storage.{GetBlock,GotBlock, PutBlock, StorageLevel, TestBlockId} import org.apache.spark.storage.{GetBlock,GotBlock, PutBlock, StorageLevel, TestBlockId}
/** /**
* A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]]. * A Spark serializer that uses the
* [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
*/ */
class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging { class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
private val bufferSize = {
System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
}
def newKryoOutput() = new KryoOutput(bufferSize) def newKryoOutput() = new KryoOutput(bufferSize)
...@@ -42,6 +46,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging ...@@ -42,6 +46,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
val kryo = instantiator.newKryo() val kryo = instantiator.newKryo()
val classLoader = Thread.currentThread.getContextClassLoader val classLoader = Thread.currentThread.getContextClassLoader
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops.
// Do this before we invoke the user registrator so the user registrator can override this.
kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
val blockId = TestBlockId("1") val blockId = TestBlockId("1")
// Register some commonly used classes // Register some commonly used classes
val toRegister: Seq[AnyRef] = Seq( val toRegister: Seq[AnyRef] = Seq(
...@@ -78,10 +86,6 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging ...@@ -78,10 +86,6 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
new AllScalaRegistrar().apply(kryo) new AllScalaRegistrar().apply(kryo)
kryo.setClassLoader(classLoader) kryo.setClassLoader(classLoader)
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
kryo kryo
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment