diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index 5a1ca496267be11a81d0caaf6c6fde657cf76176..16e365789890da780f992e127bf6974ec0ba46c3 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -19,7 +19,7 @@ class Accumulable[T,R] ( /** * add more data to this accumulator / accumulable - * @param term + * @param term the data to add */ def += (term: R) { value_ = param.addAccumulator(value_, term) } @@ -27,7 +27,7 @@ class Accumulable[T,R] ( * merge two accumulable objects together * <p> * Normally, a user will not want to use this version, but will instead call `+=`. - * @param term + * @param term the other Accumulable that will get merged with this */ def ++= (term: T) { value_ = param.addInPlace(value_, term)} def value = this.value_ @@ -64,33 +64,33 @@ trait AccumulatorParam[T] extends AccumulableParam[T,T] { /** * A datatype that can be accumulated, ie. has a commutative & associative +. - * <p> + * * You must define how to add data, and how to merge two of these together. For some datatypes, these might be * the same operation (eg., a counter). In that case, you might want to use [[spark.AccumulatorParam]]. They won't * always be the same, though -- eg., imagine you are accumulating a set. You will add items to the set, and you * will union two sets together. * - * @tparam T the full accumulated data - * @tparam R partial data that can be added in + * @tparam R the full accumulated data + * @tparam T partial data that can be added in */ -trait AccumulableParam[T,R] extends Serializable { +trait AccumulableParam[R,T] extends Serializable { /** * Add additional data to the accumulator value. * @param t1 the current value of the accumulator * @param t2 the data to be added to the accumulator * @return the new value of the accumulator */ - def addAccumulator(t1: T, t2: R) : T + def addAccumulator(t1: R, t2: T) : R /** * merge two accumulated values together - * @param t1 - * @param t2 - * @return + * @param t1 one set of accumulated data + * @param t2 another set of accumulated data + * @return both data sets merged together */ - def addInPlace(t1: T, t2: T): T + def addInPlace(t1: R, t2: R): R - def zero(initialValue: T): T + def zero(initialValue: R): R } // TODO: The multi-thread support in accumulators is kind of lame; check diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index ea85324c35f8994ac22de8e5c13fe0dd2a44e873..32f37822a560e79f2371fdfcd6b021d879f3a4d3 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -287,11 +287,8 @@ class SparkContext( /** * create an accumulatable shared variable, with a `+=` method - * @param initialValue - * @param param * @tparam T accumulator type * @tparam R type that can be added to the accumulator - * @return */ def accumulable[T,R](initialValue: T)(implicit param: AccumulableParam[T,R]) = new Accumulable(initialValue, param)