Skip to content
Snippets Groups Projects
Commit d4cd5deb authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Fix for Kryo Serializer

parent c3816de5
No related branches found
No related tags found
No related merge requests found
......@@ -26,7 +26,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.serializer.Serializer
import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
/**
......@@ -333,7 +333,18 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (!eof) {
try {
if (objectsRead == serializerBatchSize) {
deserializeStream = ser.deserializeStream(compressedStream)
val newInputStream = deserializeStream match {
case stream: KryoDeserializationStream =>
// Kryo's serializer stores an internal buffer that pre-fetches from the underlying
// stream. We need to capture this buffer and feed it to the new serialization
// stream so that the bytes are not lost.
val kryoInput = stream.input
val remainingBytes = kryoInput.limit() - kryoInput.position()
val extraBuf = kryoInput.readBytes(remainingBytes)
new SequenceInputStream(new ByteArrayInputStream(extraBuf), compressedStream)
case _ => compressedStream
}
deserializeStream = ser.deserializeStream(newInputStream)
objectsRead = 0
}
objectsRead += 1
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment