Skip to content
Snippets Groups Projects
Commit 2d511ab3 authored by Hossein Falaki's avatar Hossein Falaki
Browse files

Made SerializableHyperLogLog Externalizable and added Kryo tests

parent 13227aaa
No related branches found
No related tags found
No related merge requests found
......@@ -17,26 +17,27 @@
package org.apache.spark.util
import java.io.{ObjectOutputStream, ObjectInputStream}
import java.io.{Externalizable, ObjectOutput, ObjectInput}
import com.clearspring.analytics.stream.cardinality.{ICardinality, HyperLogLog}
/**
* A wrapper around com.clearspring.analytics.stream.cardinality.HyperLogLog that is serializable.
* A wrapper around [[com.clearspring.analytics.stream.cardinality.HyperLogLog]] that is serializable.
*/
private[spark]
class SerializableHyperLogLog(@transient var value: ICardinality) extends Serializable {
class SerializableHyperLogLog(var value: ICardinality) extends Externalizable {
def this() = this(null) // For deserialization
def merge(other: SerializableHyperLogLog) = new SerializableHyperLogLog(value.merge(other.value))
private def readObject(in: ObjectInputStream) {
def readExternal(in: ObjectInput) {
val byteLength = in.readInt()
val bytes = new Array[Byte](byteLength)
in.readFully(bytes)
value = HyperLogLog.Builder.build(bytes)
}
private def writeObject(out: ObjectOutputStream) {
def writeExternal(out: ObjectOutput) {
val bytes = value.getBytes()
out.writeInt(bytes.length)
out.write(bytes)
......
......@@ -172,6 +172,10 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === (1, 11))
}
test("kryo with SerializableHyperLogLog") {
assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countDistinct(0.01) === 3)
}
test("kryo with reduce") {
val control = 1 :: 2 :: Nil
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment