Skip to content
Snippets Groups Projects
Commit 882d0691 authored by Reynold Xin's avatar Reynold Xin
Browse files

Fixed the bug in variable encoding for longs.

parent 1e5c1781
No related branches found
No related tags found
No related merge requests found
......@@ -82,7 +82,7 @@ class IntAggMsgSerializer extends Serializer {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Int]]
writeLong(msg.vid)
writeInt(msg.data)
writeUnsignedVarInt(msg.data)
this
}
}
......@@ -90,7 +90,7 @@ class IntAggMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
val a = readLong()
val b = readInt()
val b = readUnsignedVarInt()
new AggregationMsg[Int](a, b).asInstanceOf[T]
}
}
......@@ -104,16 +104,16 @@ class LongAggMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Long]]
writeLong(msg.vid)
writeLong(msg.data)
writeVarLong(msg.vid, optimizePositive = false)
writeVarLong(msg.data, optimizePositive = true)
this
}
}
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
override def readObject[T](): T = {
val a = readLong()
val b = readLong()
val a = readVarLong(optimizePositive = false)
val b = readVarLong(optimizePositive = true)
new AggregationMsg[Long](a, b).asInstanceOf[T]
}
}
......@@ -128,7 +128,7 @@ class DoubleAggMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Double]]
writeUnsignedVarLong(msg.vid)
writeVarLong(msg.vid, optimizePositive = false)
writeDouble(msg.data)
this
}
......@@ -136,7 +136,7 @@ class DoubleAggMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
def readObject[T](): T = {
val a = readUnsignedVarLong()
val a = readVarLong(optimizePositive = false)
val b = readDouble()
new AggregationMsg[Double](a, b).asInstanceOf[T]
}
......@@ -160,21 +160,86 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial
}
def writeUnsignedVarInt(value: Int) {
var v = value
while ((v & 0xFFFFFF80) != 0L) {
s.write((v & 0x7F) | 0x80)
v = v >>> 7
if ((value >>> 7) == 0) {
s.write(value.toInt)
} else if ((value >>> 14) == 0) {
s.write((value & 0x7F) | 0x80)
s.write(value >>> 7)
} else if ((value >>> 21) == 0) {
s.write((value & 0x7F) | 0x80)
s.write(value >>> 7 | 0x80)
s.write(value >>> 14)
} else if ((value >>> 28) == 0) {
s.write((value & 0x7F) | 0x80)
s.write(value >>> 7 | 0x80)
s.write(value >>> 14 | 0x80)
s.write(value >>> 21)
} else {
s.write((value & 0x7F) | 0x80)
s.write(value >>> 7 | 0x80)
s.write(value >>> 14 | 0x80)
s.write(value >>> 21 | 0x80)
s.write(value >>> 28)
}
s.write(v & 0x7F)
}
def writeUnsignedVarLong(value: Long) {
var v = value
while ((v & 0xFFFFFF80) != 0L) {
def writeVarLong(value: Long, optimizePositive: Boolean) {
val v = if (!optimizePositive) (value << 1) ^ (value >> 63) else value
if ((v >>> 7) == 0) {
s.write(v.toInt)
} else if ((v >>> 14) == 0) {
s.write(((v & 0x7F) | 0x80).toInt)
s.write((v >>> 7).toInt)
} else if ((v >>> 21) == 0) {
s.write(((v & 0x7F) | 0x80).toInt)
s.write((v >>> 7 | 0x80).toInt)
s.write((v >>> 14).toInt)
} else if ((v >>> 28) == 0) {
s.write(((v & 0x7F) | 0x80).toInt)
v = v >>> 7
s.write((v >>> 7 | 0x80).toInt)
s.write((v >>> 14 | 0x80).toInt)
s.write((v >>> 21).toInt)
} else if ((v >>> 35) == 0) {
s.write(((v & 0x7F) | 0x80).toInt)
s.write((v >>> 7 | 0x80).toInt)
s.write((v >>> 14 | 0x80).toInt)
s.write((v >>> 21 | 0x80).toInt)
s.write((v >>> 28).toInt)
} else if ((v >>> 42) == 0) {
s.write(((v & 0x7F) | 0x80).toInt)
s.write((v >>> 7 | 0x80).toInt)
s.write((v >>> 14 | 0x80).toInt)
s.write((v >>> 21 | 0x80).toInt)
s.write((v >>> 28 | 0x80).toInt)
s.write((v >>> 35).toInt)
} else if ((v >>> 49) == 0) {
s.write(((v & 0x7F) | 0x80).toInt)
s.write((v >>> 7 | 0x80).toInt)
s.write((v >>> 14 | 0x80).toInt)
s.write((v >>> 21 | 0x80).toInt)
s.write((v >>> 28 | 0x80).toInt)
s.write((v >>> 35 | 0x80).toInt)
s.write((v >>> 42).toInt)
} else if ((v >>> 56) == 0) {
s.write(((v & 0x7F) | 0x80).toInt)
s.write((v >>> 7 | 0x80).toInt)
s.write((v >>> 14 | 0x80).toInt)
s.write((v >>> 21 | 0x80).toInt)
s.write((v >>> 28 | 0x80).toInt)
s.write((v >>> 35 | 0x80).toInt)
s.write((v >>> 42 | 0x80).toInt)
s.write((v >>> 49).toInt)
} else {
s.write(((v & 0x7F) | 0x80).toInt)
s.write((v >>> 7 | 0x80).toInt)
s.write((v >>> 14 | 0x80).toInt)
s.write((v >>> 21 | 0x80).toInt)
s.write((v >>> 28 | 0x80).toInt)
s.write((v >>> 35 | 0x80).toInt)
s.write((v >>> 42 | 0x80).toInt)
s.write((v >>> 49 | 0x80).toInt)
s.write((v >>> 56).toInt)
}
s.write((v & 0x7F).toInt)
}
def writeLong(v: Long) {
......@@ -188,8 +253,8 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial
s.write(v.toInt)
}
def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v))
//def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v))
//def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v))
def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v))
override def flush(): Unit = s.flush()
......@@ -225,7 +290,8 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
value | (b << i)
}
def readUnsignedVarLong(): Long = {
def readVarLong(optimizePositive: Boolean): Long = {
// TODO: unroll the while loop.
var value: Long = 0L
var i: Int = 0
def readOrThrow(): Int = {
......@@ -235,12 +301,13 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
}
var b: Int = readOrThrow()
while ((b & 0x80) != 0) {
value |= (b & 0x7F) << i
value |= (b & 0x7F).toLong << i
i += 7
if (i > 63) throw new IllegalArgumentException("Variable length quantity is too long")
b = readOrThrow()
}
value | (b << i)
val ret = value | (b.toLong << i)
if (!optimizePositive) (ret >>> 1) ^ -(ret & 1) else ret
}
def readLong(): Long = {
......@@ -256,8 +323,8 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
(s.read() & 0xFF)
}
def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong())
//def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong())
//def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong())
def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong())
override def close(): Unit = s.close()
}
......
......@@ -15,13 +15,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator")
test("TestVertexBroadcastMessageInt") {
val outMsg = new VertexBroadcastMsg[Int](3,4,5)
test("IntVertexBroadcastMsgSerializer") {
val outMsg = new VertexBroadcastMsg[Int](3, 4, 5)
val bout = new ByteArrayOutputStream
val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject()
......@@ -36,13 +36,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
}
test("TestVertexBroadcastMessageLong") {
val outMsg = new VertexBroadcastMsg[Long](3,4,5)
test("LongVertexBroadcastMsgSerializer") {
val outMsg = new VertexBroadcastMsg[Long](3, 4, 5)
val bout = new ByteArrayOutputStream
val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject()
......@@ -57,13 +57,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
}
test("TestVertexBroadcastMessageDouble") {
val outMsg = new VertexBroadcastMsg[Double](3,4,5.0)
test("DoubleVertexBroadcastMsgSerializer") {
val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject()
......@@ -78,13 +78,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
}
test("TestAggregationMessageInt") {
val outMsg = new AggregationMsg[Int](4,5)
test("IntAggMsgSerializer") {
val outMsg = new AggregationMsg[Int](4, 5)
val bout = new ByteArrayOutputStream
val outStrm = new IntAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new IntAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: AggregationMsg[Int] = inStrm.readObject()
......@@ -99,13 +99,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
}
test("TestAggregationMessageLong") {
val outMsg = new AggregationMsg[Long](4,5)
test("LongAggMsgSerializer") {
val outMsg = new AggregationMsg[Long](4, 1L << 32)
val bout = new ByteArrayOutputStream
val outStrm = new LongAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new LongAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: AggregationMsg[Long] = inStrm.readObject()
......@@ -120,13 +120,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext {
}
}
test("TestAggregationMessageDouble") {
val outMsg = new AggregationMsg[Double](4,5.0)
test("DoubleAggMsgSerializer") {
val outMsg = new AggregationMsg[Double](4, 5.0)
val bout = new ByteArrayOutputStream
val outStrm = new DoubleAggMsgSerializer().newInstance().serializeStream(bout)
outStrm.writeObject(outMsg)
outStrm.writeObject(outMsg)
bout.flush
bout.flush()
val bin = new ByteArrayInputStream(bout.toByteArray)
val inStrm = new DoubleAggMsgSerializer().newInstance().deserializeStream(bin)
val inMsg1: AggregationMsg[Double] = inStrm.readObject()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment