diff --git a/src/examples/Vector.scala b/src/examples/Vector.scala index 0ae2cbc6e8d16cff6df992e965f8deb63cb4a86d..e9fbdca7523a36e8067b1d34e69960b35039d410 100644 --- a/src/examples/Vector.scala +++ b/src/examples/Vector.scala @@ -57,7 +57,7 @@ object Vector { implicit def doubleToMultiplier(num: Double) = new Multiplier(num) implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] { - def add(t1: Vector, t2: Vector) = t1 + t2 + def addInPlace(t1: Vector, t2: Vector) = t1 + t2 def zero(initialValue: Vector) = Vector.zeros(initialValue.length) } } diff --git a/src/scala/spark/Accumulators.scala b/src/scala/spark/Accumulators.scala index 3e4cd4935a19cce8599b0dac05046b5fc7966bae..ee93d3c85c0e9e318d8f0a1449a9fe81f4d6da07 100644 --- a/src/scala/spark/Accumulators.scala +++ b/src/scala/spark/Accumulators.scala @@ -4,15 +4,17 @@ import java.io._ import scala.collection.mutable.Map -@serializable class Accumulator[T](initialValue: T, param: AccumulatorParam[T]) +@serializable class Accumulator[T]( + @transient initialValue: T, param: AccumulatorParam[T]) { val id = Accumulators.newId - @transient var value_ = initialValue + @transient var value_ = initialValue // Current value on master + val zero = param.zero(initialValue) // Zero value to be passed to workers var deserialized = false Accumulators.register(this) - def += (term: T) { value_ = param.add(value_, term) } + def += (term: T) { value_ = param.addInPlace(value_, term) } def value = this.value_ def value_= (t: T) { if (!deserialized) value_ = t @@ -22,7 +24,7 @@ import scala.collection.mutable.Map // Called by Java when deserializing an object private def readObject(in: ObjectInputStream) { in.defaultReadObject - value_ = param.zero(initialValue) + value_ = zero deserialized = true Accumulators.register(this) } @@ -31,7 +33,7 @@ import scala.collection.mutable.Map } @serializable trait AccumulatorParam[T] { - def add(t1: T, t2: T): T + def addInPlace(t1: T, t2: T): T def zero(initialValue: T): T } diff --git a/src/scala/spark/RDD.scala b/src/scala/spark/RDD.scala index aaf006b6cbdbe2ca334db1a02ff6a97e3a5b2bdc..5236eb958ffb4ae9023deebef784a3ab7275b37c 100644 --- a/src/scala/spark/RDD.scala +++ b/src/scala/spark/RDD.scala @@ -165,7 +165,10 @@ extends RDD[Array[T]](prev.sparkContext) { } -@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split {} +@serializable class SeededSplit(val prev: Split, val seed: Int) extends Split { + override def toString() = + "SeededSplit(" + prev.toString + ", seed " + seed + ")" +} class SampledRDD[T: ClassManifest]( prev: RDD[T], withReplacement: Boolean, frac: Double, seed: Int) @@ -268,6 +271,7 @@ private object CachedRDD { abstract class UnionSplit[T: ClassManifest] extends Split { def iterator(): Iterator[T] def preferredLocations(): Seq[String] + def toString(): String } @serializable @@ -276,6 +280,8 @@ class UnionSplitImpl[T: ClassManifest]( extends UnionSplit[T] { override def iterator() = rdd.iterator(split) override def preferredLocations() = rdd.preferredLocations(split) + override def toString() = + "UnionSplitImpl(" + split.toString + ")" } @serializable @@ -297,7 +303,10 @@ extends RDD[T](sc) { s.asInstanceOf[UnionSplit[T]].preferredLocations() } -@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split +@serializable class CartesianSplit(val s1: Split, val s2: Split) extends Split { + override def toString() = + "CartesianSplit(" + s1.toString + ", " + s2.toString + ")" +} @serializable class CartesianRDD[T: ClassManifest, U:ClassManifest]( diff --git a/src/scala/spark/SparkContext.scala b/src/scala/spark/SparkContext.scala index 20f04f863966c90a1d94f72e3b8bf6389016d53d..62e49271bfdcd2ce143030294c0cf96f80e3b778 100644 --- a/src/scala/spark/SparkContext.scala +++ b/src/scala/spark/SparkContext.scala @@ -82,12 +82,12 @@ class SparkContext(master: String, frameworkName: String) extends Logging { object SparkContext { implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { - def add(t1: Double, t2: Double): Double = t1 + t2 + def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 } implicit object IntAccumulatorParam extends AccumulatorParam[Int] { - def add(t1: Int, t2: Int): Int = t1 + t2 + def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int) = 0 }