diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 5e264b48ddc223906a03d2659ec623041487507e..1f794379f74f34d834c9bc4720380ab4d2f07386 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -41,9 +41,11 @@ class BitSet(numBits: Int) { val wordIndex = bitIndex >> 6 // divide by 64 var i = 0 while(i < wordIndex) { words(i) = -1; i += 1 } - // Set the remaining bits - val mask = ~(-1L << (bitIndex & 0x3f)) - words(wordIndex) |= mask + if(wordIndex < words.size) { + // Set the remaining bits (note that the mask could still be zero) + val mask = ~(-1L << (bitIndex & 0x3f)) + words(wordIndex) |= mask + } } diff --git a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala index 6beaea07fa060094aa0ad217a1240a12c0df7f26..8320b663a559d793e15d609b94ba3f09a137ca6f 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Analytics.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Analytics.scala @@ -284,6 +284,9 @@ object Analytics extends Logging { pr.vertices.map{case (id, r) => id + "\t" + r}.saveAsTextFile(outFname) } logInfo("GRAPHX: Runtime: " + ((System.currentTimeMillis - startTime)/1000.0) + " seconds") + + + Thread.sleep(1000000) sc.stop() } diff --git a/graph/src/main/scala/org/apache/spark/graph/Graph.scala b/graph/src/main/scala/org/apache/spark/graph/Graph.scala index 82bdb099a5f09bd9c23ee176a20220a724e5ed4f..418440cded69bc5f525907c2e6cfc2e9e5c79160 100644 --- a/graph/src/main/scala/org/apache/spark/graph/Graph.scala +++ b/graph/src/main/scala/org/apache/spark/graph/Graph.scala @@ -406,6 +406,30 @@ object Graph { } + + /** + * Construct a graph from a collection attributed vertices and + * edges. Duplicate vertices are combined using the `mergeFunc` and + * vertices found in the edge collection but not in the input + * vertices are the default attribute `defautVertexAttr`. + * + * @tparam VD the vertex attribute type + * @tparam ED the edge attribute type + * @param vertices the "set" of vertices and their attributes + * @param edges the collection of edges in the graph + * @param defaultVertexAttr the default vertex attribute to use for + * vertices that are mentioned in `edges` but not in `vertices + * @param mergeFunc the function used to merge duplicate vertices + * in the `vertices` collection. + * + */ + def apply[VD: ClassManifest, ED: ClassManifest]( + vertices: RDD[(Vid,VD)], + edges: RDD[Edge[ED]], + defaultVertexAttr: VD): Graph[VD, ED] = { + GraphImpl(vertices, edges, defaultVertexAttr, (a,b) => a) + } + /** * Construct a graph from a collection attributed vertices and * edges. Duplicate vertices are combined using the `mergeFunc` and diff --git a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala index 401d5f29bc134e04b2a8c036f7eeb8dad9e2b314..507370539e31af80055ec38c6e39037bf7b7b3ea 100644 --- a/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala +++ b/graph/src/main/scala/org/apache/spark/graph/VertexSetRDD.scala @@ -204,6 +204,31 @@ class VertexSetRDD[@specialized V: ClassManifest]( new VertexSetRDD[U](index, newValuesRDD) } // end of mapValues + /** + * Fill in missing values for all vertices in the index. + * + * @param missingValue the value to be used for vertices in the + * index that don't currently have values. + * @return A VertexSetRDD with a value for all vertices. + */ + def fillMissing(missingValue: V): VertexSetRDD[V] = { + val newValuesRDD: RDD[ (Array[V], BitSet) ] = + valuesRDD.zipPartitions(index.rdd){ (valuesIter, indexIter) => + val index = indexIter.next + assert(!indexIter.hasNext) + val (values, bs: BitSet) = valuesIter.next + assert(!valuesIter.hasNext) + // Allocate a new values array with missing value as the default + val newValues = Array.fill(values.size)(missingValue) + // Copy over the old values + bs.iterator.foreach { ind => newValues(ind) = values(ind) } + // Create a new bitset matching the keyset + val newBS = index.getBitSet + Iterator((newValues, newBS)) + } + new VertexSetRDD[V](index, newValuesRDD) + } + /** * Pass each vertex attribute along with the vertex id through a map * function and retain the original RDD's partitioning and index. @@ -380,7 +405,6 @@ class VertexSetRDD[@specialized V: ClassManifest]( // this vertex set then we use the much more efficient leftZipJoin case other: VertexSetRDD[_] if index == other.index => { leftZipJoin(other)(cleanF) - // @todo handle case where other is a VertexSetRDD with a different index } case _ => { val indexedOther: VertexSetRDD[W] = VertexSetRDD(other, index, cleanMerge) @@ -599,28 +623,24 @@ object VertexSetRDD { * can be used to build VertexSets over subsets of the vertices in * the input. */ - def makeIndex(keys: RDD[Vid], - partitioner: Option[Partitioner] = None): VertexSetIndex = { - // @todo: I don't need the boolean its only there to be the second type since I want to shuffle a single RDD - // Ugly hack :-(. In order to partition the keys they must have values. - val tbl = keys.mapPartitions(_.map(k => (k, false)), true) - // Shuffle the table (if necessary) - val shuffledTbl = partitioner match { - case None => { - if (tbl.partitioner.isEmpty) { - // @todo: I don't need the boolean its only there to be the second type of the shuffle. - new ShuffledRDD[Vid, Boolean, (Vid, Boolean)](tbl, Partitioner.defaultPartitioner(tbl)) - } else { tbl } - } - case Some(partitioner) => - tbl.partitionBy(partitioner) + def makeIndex(keys: RDD[Vid], partitionerOpt: Option[Partitioner] = None): VertexSetIndex = { + val partitioner = partitionerOpt match { + case Some(p) => p + case None => Partitioner.defaultPartitioner(keys) } - val index = shuffledTbl.mapPartitions( iter => { + val preAgg: RDD[(Vid, Unit)] = keys.mapPartitions( iter => { + val keys = new VertexIdToIndexMap + while(iter.hasNext) { keys.add(iter.next) } + keys.iterator.map(k => (k, ())) + }, true).partitionBy(partitioner) + + val index = preAgg.mapPartitions( iter => { val index = new VertexIdToIndexMap - for ( (k,_) <- iter ){ index.add(k) } + while(iter.hasNext) { index.add(iter.next._1) } Iterator(index) - }, true).cache + }, true).cache + new VertexSetIndex(index) } diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala index 693bb888bca265a9b73720c63968cd26e542d6e0..aababc366534c44c9334273d7bb42c8b20e64fbf 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/GraphImpl.scala @@ -9,6 +9,7 @@ import org.apache.spark.SparkContext._ import org.apache.spark.HashPartitioner import org.apache.spark.util.ClosureCleaner +import org.apache.spark.Partitioner import org.apache.spark.graph._ import org.apache.spark.graph.impl.GraphImpl._ import org.apache.spark.graph.impl.MsgRDDFunctions._ @@ -320,20 +321,21 @@ object GraphImpl { defaultVertexAttr: VD, mergeFunc: (VD, VD) => VD): GraphImpl[VD,ED] = { - val vtable = VertexSetRDD(vertices, mergeFunc) - /** - * @todo Verify that there are no edges that contain vertices - * that are not in vTable. This should probably be resolved: - * - * edges.flatMap{ e => Array((e.srcId, null), (e.dstId, null)) } - * .cogroup(vertices).map{ - * case (vid, _, attr) => - * if (attr.isEmpty) (vid, defaultValue) - * else (vid, attr) - * } - * - */ - val etable = createETable(edges) + vertices.cache + val etable = createETable(edges).cache + // Get the set of all vids, preserving partitions + val partitioner = Partitioner.defaultPartitioner(vertices) + val implicitVids = etable.flatMap { + case (pid, partition) => Array.concat(partition.srcIds, partition.dstIds) + }.map(vid => (vid, ())).partitionBy(partitioner) + val allVids = vertices.zipPartitions(implicitVids) { + (a, b) => a.map(_._1) ++ b.map(_._1) + } + // Index the set of all vids + val index = VertexSetRDD.makeIndex(allVids, Some(partitioner)) + // Index the vertices and fill in missing attributes with the default + val vtable = VertexSetRDD(vertices, index, mergeFunc).fillMissing(defaultVertexAttr) + val vid2pid = new Vid2Pid(etable, vtable.index) val localVidMap = createLocalVidMap(etable) new GraphImpl(vtable, vid2pid, localVidMap, etable) diff --git a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala index c56bbc8aee9776e44cd4d6f311d34d18f5763a24..2e768e85cfb47094f6fcf520aeeb0db0ab8640e1 100644 --- a/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala +++ b/graph/src/main/scala/org/apache/spark/graph/impl/Serializers.scala @@ -82,7 +82,7 @@ class IntAggMsgSerializer extends Serializer { def writeObject[T](t: T) = { val msg = t.asInstanceOf[AggregationMsg[Int]] writeLong(msg.vid) - writeInt(msg.data) + writeUnsignedVarInt(msg.data) this } } @@ -90,7 +90,7 @@ class IntAggMsgSerializer extends Serializer { override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { override def readObject[T](): T = { val a = readLong() - val b = readInt() + val b = readUnsignedVarInt() new AggregationMsg[Int](a, b).asInstanceOf[T] } } @@ -104,16 +104,16 @@ class LongAggMsgSerializer extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { val msg = t.asInstanceOf[AggregationMsg[Long]] - writeLong(msg.vid) - writeLong(msg.data) + writeVarLong(msg.vid, optimizePositive = false) + writeVarLong(msg.data, optimizePositive = true) this } } override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { override def readObject[T](): T = { - val a = readLong() - val b = readLong() + val a = readVarLong(optimizePositive = false) + val b = readVarLong(optimizePositive = true) new AggregationMsg[Long](a, b).asInstanceOf[T] } } @@ -128,7 +128,7 @@ class DoubleAggMsgSerializer extends Serializer { override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { def writeObject[T](t: T) = { val msg = t.asInstanceOf[AggregationMsg[Double]] - writeLong(msg.vid) + writeVarLong(msg.vid, optimizePositive = false) writeDouble(msg.data) this } @@ -136,7 +136,7 @@ class DoubleAggMsgSerializer extends Serializer { override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { def readObject[T](): T = { - val a = readLong() + val a = readVarLong(optimizePositive = false) val b = readDouble() new AggregationMsg[Double](a, b).asInstanceOf[T] } @@ -159,6 +159,89 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial s.write(v) } + def writeUnsignedVarInt(value: Int) { + if ((value >>> 7) == 0) { + s.write(value.toInt) + } else if ((value >>> 14) == 0) { + s.write((value & 0x7F) | 0x80) + s.write(value >>> 7) + } else if ((value >>> 21) == 0) { + s.write((value & 0x7F) | 0x80) + s.write(value >>> 7 | 0x80) + s.write(value >>> 14) + } else if ((value >>> 28) == 0) { + s.write((value & 0x7F) | 0x80) + s.write(value >>> 7 | 0x80) + s.write(value >>> 14 | 0x80) + s.write(value >>> 21) + } else { + s.write((value & 0x7F) | 0x80) + s.write(value >>> 7 | 0x80) + s.write(value >>> 14 | 0x80) + s.write(value >>> 21 | 0x80) + s.write(value >>> 28) + } + } + + def writeVarLong(value: Long, optimizePositive: Boolean) { + val v = if (!optimizePositive) (value << 1) ^ (value >> 63) else value + if ((v >>> 7) == 0) { + s.write(v.toInt) + } else if ((v >>> 14) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7).toInt) + } else if ((v >>> 21) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14).toInt) + } else if ((v >>> 28) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21).toInt) + } else if ((v >>> 35) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28).toInt) + } else if ((v >>> 42) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28 | 0x80).toInt) + s.write((v >>> 35).toInt) + } else if ((v >>> 49) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28 | 0x80).toInt) + s.write((v >>> 35 | 0x80).toInt) + s.write((v >>> 42).toInt) + } else if ((v >>> 56) == 0) { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28 | 0x80).toInt) + s.write((v >>> 35 | 0x80).toInt) + s.write((v >>> 42 | 0x80).toInt) + s.write((v >>> 49).toInt) + } else { + s.write(((v & 0x7F) | 0x80).toInt) + s.write((v >>> 7 | 0x80).toInt) + s.write((v >>> 14 | 0x80).toInt) + s.write((v >>> 21 | 0x80).toInt) + s.write((v >>> 28 | 0x80).toInt) + s.write((v >>> 35 | 0x80).toInt) + s.write((v >>> 42 | 0x80).toInt) + s.write((v >>> 49 | 0x80).toInt) + s.write((v >>> 56).toInt) + } + } + def writeLong(v: Long) { s.write((v >>> 56).toInt) s.write((v >>> 48).toInt) @@ -170,9 +253,8 @@ sealed abstract class ShuffleSerializationStream(s: OutputStream) extends Serial s.write(v.toInt) } - def writeDouble(v: Double) { - writeLong(java.lang.Double.doubleToLongBits(v)) - } + //def writeDouble(v: Double): Unit = writeUnsignedVarLong(java.lang.Double.doubleToLongBits(v)) + def writeDouble(v: Double): Unit = writeLong(java.lang.Double.doubleToLongBits(v)) override def flush(): Unit = s.flush() @@ -190,6 +272,44 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser (first & 0xFF) << 24 | (s.read() & 0xFF) << 16 | (s.read() & 0xFF) << 8 | (s.read() & 0xFF) } + def readUnsignedVarInt(): Int = { + var value: Int = 0 + var i: Int = 0 + def readOrThrow(): Int = { + val in = s.read() + if (in < 0) throw new java.io.EOFException + in & 0xFF + } + var b: Int = readOrThrow() + while ((b & 0x80) != 0) { + value |= (b & 0x7F) << i + i += 7 + if (i > 35) throw new IllegalArgumentException("Variable length quantity is too long") + b = readOrThrow() + } + value | (b << i) + } + + def readVarLong(optimizePositive: Boolean): Long = { + // TODO: unroll the while loop. + var value: Long = 0L + var i: Int = 0 + def readOrThrow(): Int = { + val in = s.read() + if (in < 0) throw new java.io.EOFException + in & 0xFF + } + var b: Int = readOrThrow() + while ((b & 0x80) != 0) { + value |= (b & 0x7F).toLong << i + i += 7 + if (i > 63) throw new IllegalArgumentException("Variable length quantity is too long") + b = readOrThrow() + } + val ret = value | (b.toLong << i) + if (!optimizePositive) (ret >>> 1) ^ -(ret & 1) else ret + } + def readLong(): Long = { val first = s.read() if (first < 0) throw new EOFException() @@ -203,6 +323,7 @@ sealed abstract class ShuffleDeserializationStream(s: InputStream) extends Deser (s.read() & 0xFF) } + //def readDouble(): Double = java.lang.Double.longBitsToDouble(readUnsignedVarLong()) def readDouble(): Double = java.lang.Double.longBitsToDouble(readLong()) override def close(): Unit = s.close() diff --git a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala index 9c22608554671e8b6f98bafbd59219074a1539fc..899048a17a6d22c4e9e1d2bf42276d9f2eb100b6 100644 --- a/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/GraphSuite.scala @@ -4,7 +4,7 @@ import org.scalatest.FunSuite import org.apache.spark.SparkContext import org.apache.spark.graph.LocalSparkContext._ - +import org.apache.spark.rdd._ class GraphSuite extends FunSuite with LocalSparkContext { @@ -20,6 +20,21 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("Graph Creation with invalid vertices") { + withSpark(new SparkContext("local", "test")) { sc => + val rawEdges = (0L to 98L).zip((1L to 99L) :+ 0L) + val edges: RDD[Edge[Int]] = sc.parallelize(rawEdges).map { case (s, t) => Edge(s, t, 1) } + val vertices: RDD[(Vid, Boolean)] = sc.parallelize((0L until 10L).map(id => (id, true))) + val graph = Graph(vertices, edges, false) + assert( graph.edges.count() === rawEdges.size ) + assert( graph.vertices.count() === 100) + graph.triplets.map { et => + assert( (et.srcId < 10 && et.srcAttr) || (et.srcId >= 10 && !et.srcAttr) ) + assert( (et.dstId < 10 && et.dstAttr) || (et.dstId >= 10 && !et.dstAttr) ) + } + } + } + test("mapEdges") { withSpark(new SparkContext("local", "test")) { sc => val n = 3 diff --git a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala index 0d55cc01896cd234f439f1f9f64ef7aa4c0fcc65..6295f866b8c48eb7ec52feb5d20df462b559a78d 100644 --- a/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/SerializerSuite.scala @@ -15,13 +15,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext { System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") System.setProperty("spark.kryo.registrator", "org.apache.spark.graph.GraphKryoRegistrator") - test("TestVertexBroadcastMessageInt") { - val outMsg = new VertexBroadcastMsg[Int](3,4,5) + test("IntVertexBroadcastMsgSerializer") { + val outMsg = new VertexBroadcastMsg[Int](3, 4, 5) val bout = new ByteArrayOutputStream val outStrm = new IntVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) - bout.flush + bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new IntVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: VertexBroadcastMsg[Int] = inStrm.readObject() @@ -36,13 +36,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } } - test("TestVertexBroadcastMessageLong") { - val outMsg = new VertexBroadcastMsg[Long](3,4,5) + test("LongVertexBroadcastMsgSerializer") { + val outMsg = new VertexBroadcastMsg[Long](3, 4, 5) val bout = new ByteArrayOutputStream val outStrm = new LongVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) - bout.flush + bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new LongVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: VertexBroadcastMsg[Long] = inStrm.readObject() @@ -57,13 +57,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } } - test("TestVertexBroadcastMessageDouble") { - val outMsg = new VertexBroadcastMsg[Double](3,4,5.0) + test("DoubleVertexBroadcastMsgSerializer") { + val outMsg = new VertexBroadcastMsg[Double](3, 4, 5.0) val bout = new ByteArrayOutputStream val outStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) - bout.flush + bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new DoubleVertexBroadcastMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: VertexBroadcastMsg[Double] = inStrm.readObject() @@ -78,13 +78,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } } - test("TestAggregationMessageInt") { - val outMsg = new AggregationMsg[Int](4,5) + test("IntAggMsgSerializer") { + val outMsg = new AggregationMsg[Int](4, 5) val bout = new ByteArrayOutputStream val outStrm = new IntAggMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) - bout.flush + bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new IntAggMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: AggregationMsg[Int] = inStrm.readObject() @@ -99,13 +99,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } } - test("TestAggregationMessageLong") { - val outMsg = new AggregationMsg[Long](4,5) + test("LongAggMsgSerializer") { + val outMsg = new AggregationMsg[Long](4, 1L << 32) val bout = new ByteArrayOutputStream val outStrm = new LongAggMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) - bout.flush + bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new LongAggMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: AggregationMsg[Long] = inStrm.readObject() @@ -120,13 +120,13 @@ class SerializerSuite extends FunSuite with LocalSparkContext { } } - test("TestAggregationMessageDouble") { - val outMsg = new AggregationMsg[Double](4,5.0) + test("DoubleAggMsgSerializer") { + val outMsg = new AggregationMsg[Double](4, 5.0) val bout = new ByteArrayOutputStream val outStrm = new DoubleAggMsgSerializer().newInstance().serializeStream(bout) outStrm.writeObject(outMsg) outStrm.writeObject(outMsg) - bout.flush + bout.flush() val bin = new ByteArrayInputStream(bout.toByteArray) val inStrm = new DoubleAggMsgSerializer().newInstance().deserializeStream(bin) val inMsg1: AggregationMsg[Double] = inStrm.readObject()