From 1e5c17812de073dd7fe890b99f921489895dae59 Mon Sep 17 00:00:00 2001
From: Reynold Xin <rxin@apache.org>
Date: Tue, 12 Nov 2013 15:30:27 -0800
Subject: [PATCH] Use variable encoding for ints, longs, and doubles in the
 specialized serializers.

---
 .../org/apache/spark/graph/Analytics.scala    |  3 +
 .../apache/spark/graph/impl/Serializers.scala | 66 +++++++++++++++++--
 2 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
index 6beaea07fa..8320b663a5 100644
--- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala
@@ -284,6 +284,9 @@ object Analytics extends Logging {
            pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname)
          }
          logInfo("GRAPHX: Runtime:    " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds")
+
+
+         Thread.sleep(1000000)
          sc.stop()
        }
 
diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
index c56bbc8aee..8c366f5fe0 100644
--- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala
@@ -128,7 +128,7 @@ class DoubleAggMsgSerializer extends Serializer {
     override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) {
       def writeObject[T](t: T) = {
         val msg = t.asInstanceOf[AggregationMsg[Double]]
-        writeLong(msg.vid)
+        writeUnsignedVarLong(msg.vid)
         writeDouble(msg.data)
         this
       }
@@ -136,7 +136,7 @@ class DoubleAggMsgSerializer extends Serializer {
 
     override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) {
       def readObject[T](): T = {
-        val a = readLong()
+        val a = readUnsignedVarLong()
         val b = readDouble()
         new AggregationMsg[Double](a, b).asInstanceOf[T]
       }
@@ -159,6 +159,24 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial
     s.write(v)
   }
 
+  def writeUnsignedVarInt(value: Int) {
+    var v = value
+    while ((v & 0xFFFFFF80) != 0L) {
+      s.write((v & 0x7F) | 0x80)
+      v = v >>> 7
+    }
+    s.write(v & 0x7F)
+  }
+
+  def writeUnsignedVarLong(value: Long) {
+    var v = value
+    while ((v & 0xFFFFFF80) != 0L) {
+      s.write(((v & 0x7F) | 0x80).toInt)
+      v = v >>> 7
+    }
+    s.write((v & 0x7F).toInt)
+  }
+
   def writeLong(v: Long) {
     s.write((v >>> 56).toInt)
     s.write((v >>> 48).toInt)
@@ -170,9 +188,8 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial
     s.write(v.toInt)
   }
 
-  def writeDouble(v: Double) {
-    writeLong(java.lang.Double.doubleToLongBits(v))
-  }
+  def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v))
+  //def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v))
 
   override def flush(): Unit = s.flush()
 
@@ -190,6 +207,42 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
     (first & 0xFF) << 24 | (s.read() & 0xFF) << 16 | (s.read() & 0xFF) << 8 | (s.read() & 0xFF)
   }
 
+  def readUnsignedVarInt(): Int = {
+    var value: Int = 0
+    var i: Int = 0
+    def readOrThrow(): Int = {
+      val in = s.read()
+      if (in < 0) throw new java.io.EOFException
+      in & 0xFF
+    }
+    var b: Int = readOrThrow()
+    while ((b & 0x80) != 0) {
+      value |= (b & 0x7F) << i
+      i += 7
+      if (i > 35) throw new IllegalArgumentException("Variable length quantity is too long")
+      b = readOrThrow()
+    }
+    value | (b << i)
+  }
+
+  def readUnsignedVarLong(): Long = {
+    var value: Long = 0L
+    var i: Int = 0
+    def readOrThrow(): Int = {
+      val in = s.read()
+      if (in < 0) throw new java.io.EOFException
+      in & 0xFF
+    }
+    var b: Int = readOrThrow()
+    while ((b & 0x80) != 0) {
+      value |= (b & 0x7F) << i
+      i += 7
+      if (i > 63) throw new IllegalArgumentException("Variable length quantity is too long")
+      b = readOrThrow()
+    }
+    value | (b << i)
+  }
+
   def readLong(): Long = {
     val first = s.read()
     if (first < 0) throw new EOFException()
@@ -203,7 +256,8 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser
       (s.read() & 0xFF)
   }
 
-  def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong())
+  def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong())
+  //def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong())
 
   override def close(): Unit = s.close()
 }
-- 
GitLab