Skip to content
Snippets Groups Projects
Commit 320418f7 authored by Reynold Xin's avatar Reynold Xin
Browse files

Merge pull request #49 from mateiz/kryo-fix-2

Fix Chill serialization of Range objects

It used to write out each element one by one, creating very large objects.
parents 215238cb c84c2052
No related branches found
No related tags found
No related merge requests found
...@@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream} ...@@ -23,7 +23,7 @@ import java.io.{EOFException, InputStream, OutputStream}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.esotericsoftware.kryo.{KryoException, Kryo} import com.esotericsoftware.kryo.{KryoException, Kryo}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.twitter.chill.ScalaKryoInstantiator import com.twitter.chill.{EmptyScalaKryoInstantiator, AllScalaRegistrar}
import org.apache.spark.{SerializableWritable, Logging} import org.apache.spark.{SerializableWritable, Logging}
import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel} import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel}
...@@ -39,7 +39,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging ...@@ -39,7 +39,7 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
def newKryoOutput() = new KryoOutput(bufferSize) def newKryoOutput() = new KryoOutput(bufferSize)
def newKryo(): Kryo = { def newKryo(): Kryo = {
val instantiator = new ScalaKryoInstantiator val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo() val kryo = instantiator.newKryo()
val classLoader = Thread.currentThread.getContextClassLoader val classLoader = Thread.currentThread.getContextClassLoader
...@@ -49,7 +49,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging ...@@ -49,7 +49,11 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY,
PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY), PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
GotBlock("1", ByteBuffer.allocate(1)), GotBlock("1", ByteBuffer.allocate(1)),
GetBlock("1") GetBlock("1"),
1 to 10,
1 until 10,
1L to 10L,
1L until 10L
) )
for (obj <- toRegister) kryo.register(obj.getClass) for (obj <- toRegister) kryo.register(obj.getClass)
...@@ -69,6 +73,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging ...@@ -69,6 +73,10 @@ class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging
case _: Exception => println("Failed to register spark.kryo.registrator") case _: Exception => println("Failed to register spark.kryo.registrator")
} }
// Register Chill's classes; we do this after our ranges and the user's own classes to let
// our code override the generic serialziers in Chill for things like Seq
new AllScalaRegistrar().apply(kryo)
kryo.setClassLoader(classLoader) kryo.setClassLoader(classLoader)
// Allow disabling Kryo reference tracking if user knows their object graphs don't have loops // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
......
...@@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { ...@@ -103,6 +103,27 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three"))) check(List(mutable.HashMap("one" -> 1, "two" -> 2),mutable.HashMap(1->"one",2->"two",3->"three")))
} }
test("ranges") {
val ser = (new KryoSerializer).newInstance()
def check[T](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
// Check that very long ranges don't get written one element at a time
assert(ser.serialize(t).limit < 100)
}
check(1 to 1000000)
check(1 to 1000000 by 2)
check(1 until 1000000)
check(1 until 1000000 by 2)
check(1L to 1000000L)
check(1L to 1000000L by 2L)
check(1L until 1000000L)
check(1L until 1000000L by 2L)
check(1.0 to 1000000.0 by 1.0)
check(1.0 to 1000000.0 by 2.0)
check(1.0 until 1000000.0 by 1.0)
check(1.0 until 1000000.0 by 2.0)
}
test("custom registrator") { test("custom registrator") {
System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName) System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment