Skip to content
Snippets Groups Projects
Commit 1e5c1781 authored by Reynold Xin's avatar Reynold Xin
Browse files

Use variable encoding for ints, longs, and doubles in the specialized serializers.

parent 143c01db
No related branches found
No related tags found
No related merge requests found
...@@ -284,6 +284,9 @@ object Analytics extends Logging { ...@@ -284,6 +284,9 @@ object Analytics extends Logging {
pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
} }
logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
Thread.sleep(1000000)
sc.stop() sc.stop()
} }
......
...@@ -128,7 +128,7 @@ class DoubleAggMsgSerializer extends Serializer { ...@@ -128,7 +128,7 @@ class DoubleAggMsgSerializer extends Serializer {
override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
def writeObject[T](t: T) = { def writeObject[T](t: T) = {
val msg = t.asInstanceOf[AggregationMsg[Double]] val msg = t.asInstanceOf[AggregationMsg[Double]]
writeLong(msg.vid) writeUnsignedVarLong(msg.vid)
writeDouble(msg.data) writeDouble(msg.data)
this this
} }
...@@ -136,7 +136,7 @@ class DoubleAggMsgSerializer extends Serializer { ...@@ -136,7 +136,7 @@ class DoubleAggMsgSerializer extends Serializer {
override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
def readObject[T](): T = { def readObject[T](): T = {
val a = readLong() val a = readUnsignedVarLong()
val b = readDouble() val b = readDouble()
new AggregationMsg[Double](a, b).asInstanceOf[T] new AggregationMsg[Double](a, b).asInstanceOf[T]
} }
...@@ -159,6 +159,24 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial ...@@ -159,6 +159,24 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial
s.write(v) s.write(v)
} }
def writeUnsignedVarInt(value: Int) {
var v = value
while ((v & 0xFFFFFF80) != 0L) {
s.write((v & 0x7F) | 0x80)
v = v >>> 7
}
s.write(v & 0x7F)
}
def writeUnsignedVarLong(value: Long) {
var v = value
while ((v & 0xFFFFFF80) != 0L) {
s.write(((v & 0x7F) | 0x80).toInt)
v = v >>> 7
}
s.write((v & 0x7F).toInt)
}
def writeLong(v: Long) { def writeLong(v: Long) {
s.write((v >>> 56).toInt) s.write((v >>> 56).toInt)
s.write((v >>> 48).toInt) s.write((v >>> 48).toInt)
...@@ -170,9 +188,8 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial ...@@ -170,9 +188,8 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial
s.write(v.toInt) s.write(v.toInt)
} }
def writeDouble(v: Double) { def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v))
writeLong(java.lang.Double.doubleToLongBits(v)) //def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v))
}
override def flush(): Unit = s.flush() override def flush(): Unit = s.flush()
...@@ -190,6 +207,42 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser ...@@ -190,6 +207,42 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
(first & 0xFF) << 24 | (s.read() & 0xFF) << 16 | (s.read() & 0xFF) << 8 | (s.read() & 0xFF) (first & 0xFF) << 24 | (s.read() & 0xFF) << 16 | (s.read() & 0xFF) << 8 | (s.read() & 0xFF)
} }
def readUnsignedVarInt(): Int = {
var value: Int = 0
var i: Int = 0
def readOrThrow(): Int = {
val in = s.read()
if (in < 0) throw new java.io.EOFException
in & 0xFF
}
var b: Int = readOrThrow()
while ((b & 0x80) != 0) {
value |= (b & 0x7F) << i
i += 7
if (i > 35) throw new IllegalArgumentException("Variable length quantity is too long")
b = readOrThrow()
}
value | (b << i)
}
def readUnsignedVarLong(): Long = {
var value: Long = 0L
var i: Int = 0
def readOrThrow(): Int = {
val in = s.read()
if (in < 0) throw new java.io.EOFException
in & 0xFF
}
var b: Int = readOrThrow()
while ((b & 0x80) != 0) {
value |= (b & 0x7F) << i
i += 7
if (i > 63) throw new IllegalArgumentException("Variable length quantity is too long")
b = readOrThrow()
}
value | (b << i)
}
def readLong(): Long = { def readLong(): Long = {
val first = s.read() val first = s.read()
if (first < 0) throw new EOFException() if (first < 0) throw new EOFException()
...@@ -203,7 +256,8 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser ...@@ -203,7 +256,8 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
(s.read() & 0xFF) (s.read() & 0xFF)
} }
def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong()) def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong())
//def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong())
override def close(): Unit = s.close() override def close(): Unit = s.close()
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment