diff --git a/docs/programming-guide.md b/docs/programming-guide.md index d375926a910e66b8b345a43ae5dcf83fa5a9baf7..70fd627c6fe69b5460f1b24885dd21932dc961b7 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -1352,41 +1352,42 @@ The code below shows an accumulator being used to add up the elements of an arra <div data-lang="scala" markdown="1"> {% highlight scala %} -scala> val accum = sc.accumulator(0, "My Accumulator") -accum: org.apache.spark.Accumulator[Int] = 0 +scala> val accum = sc.longAccumulator("My Accumulator") +accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) -scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) +scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value -res2: Int = 10 +res2: Long = 10 {% endhighlight %} -While this code used the built-in support for accumulators of type Int, programmers can also -create their own types by subclassing [AccumulatorParam](api/scala/index.html#org.apache.spark.AccumulatorParam). -The AccumulatorParam interface has two methods: `zero` for providing a "zero value" for your data -type, and `addInPlace` for adding two values together. For example, supposing we had a `Vector` class +While this code used the built-in support for accumulators of type Long, programmers can also +create their own types by subclassing [AccumulatorV2](api/scala/index.html#org.apache.spark.AccumulatorV2). +The AccumulatorV2 abstract class has several methods which need to override: +`reset` for resetting the accumulator to zero, and `add` for add anothor value into the accumulator, `merge` for merging another same-type accumulator into this one. Other methods need to override can refer to scala API document. For example, supposing we had a `MyVector` class representing mathematical vectors, we could write: {% highlight scala %} -object VectorAccumulatorParam extends AccumulatorParam[Vector] { - def zero(initialValue: Vector): Vector = { - Vector.zeros(initialValue.size) +object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] { + val vec_ : MyVector = MyVector.createZeroVector + def reset(): MyVector = { + vec_.reset() } - def addInPlace(v1: Vector, v2: Vector): Vector = { - v1 += v2 + def add(v1: MyVector, v2: MyVector): MyVector = { + vec_.add(v2) } + ... } // Then, create an Accumulator of this type: -val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam) +val myVectorAcc = new VectorAccumulatorV2 +// Then, register it into spark context: +sc.register(myVectorAcc, "MyVectorAcc1") {% endhighlight %} -In Scala, Spark also supports the more general [Accumulable](api/scala/index.html#org.apache.spark.Accumulable) -interface to accumulate data where the resulting type is not the same as the elements added (e.g. build -a list by collecting together elements), and the `SparkContext.accumulableCollection` method for accumulating -common Scala collection types. +Note that, when programmers define their own type of AccumulatorV2, the resulting type can be same or not same with the elements added. </div>