Skip to content
Snippets Groups Projects
Commit d5c79f56 authored by Reynold Xin's avatar Reynold Xin
Browse files

[SPARK-15054] Deprecate old accumulator API

## What changes were proposed in this pull request?
This patch deprecates the old accumulator API.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #12832 from rxin/SPARK-15054.
parent 8a1ce489
No related branches found
No related tags found
No related merge requests found
...@@ -17,14 +17,13 @@ ...@@ -17,14 +17,13 @@
package org.apache.spark package org.apache.spark
import java.io.{ObjectInputStream, Serializable} import java.io.Serializable
import scala.collection.generic.Growable import scala.collection.generic.Growable
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.spark.scheduler.AccumulableInfo import org.apache.spark.scheduler.AccumulableInfo
import org.apache.spark.serializer.JavaSerializer import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.util.Utils
/** /**
...@@ -49,6 +48,7 @@ import org.apache.spark.util.Utils ...@@ -49,6 +48,7 @@ import org.apache.spark.util.Utils
* @tparam R the full accumulated data (result type) * @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in * @tparam T partial data that can be added in
*/ */
@deprecated("use AccumulatorV2", "2.0.0")
class Accumulable[R, T] private ( class Accumulable[R, T] private (
val id: Long, val id: Long,
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
...@@ -162,6 +162,7 @@ class Accumulable[R, T] private ( ...@@ -162,6 +162,7 @@ class Accumulable[R, T] private (
* @tparam R the full accumulated data (result type) * @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in * @tparam T partial data that can be added in
*/ */
@deprecated("use AccumulatorV2", "2.0.0")
trait AccumulableParam[R, T] extends Serializable { trait AccumulableParam[R, T] extends Serializable {
/** /**
* Add additional data to the accumulator value. Is allowed to modify and return `r` * Add additional data to the accumulator value. Is allowed to modify and return `r`
...@@ -191,6 +192,7 @@ trait AccumulableParam[R, T] extends Serializable { ...@@ -191,6 +192,7 @@ trait AccumulableParam[R, T] extends Serializable {
} }
@deprecated("use AccumulatorV2", "2.0.0")
private[spark] class private[spark] class
GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
extends AccumulableParam[R, T] { extends AccumulableParam[R, T] {
......
...@@ -17,13 +17,6 @@ ...@@ -17,13 +17,6 @@
package org.apache.spark package org.apache.spark
import java.util.concurrent.atomic.AtomicLong
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import scala.ref.WeakReference
import org.apache.spark.internal.Logging
import org.apache.spark.storage.{BlockId, BlockStatus} import org.apache.spark.storage.{BlockId, BlockStatus}
...@@ -58,7 +51,8 @@ import org.apache.spark.storage.{BlockId, BlockStatus} ...@@ -58,7 +51,8 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
* @param name human-readable name associated with this accumulator * @param name human-readable name associated with this accumulator
* @param countFailedValues whether to accumulate values from failed tasks * @param countFailedValues whether to accumulate values from failed tasks
* @tparam T result type * @tparam T result type
*/ */
@deprecated("use AccumulatorV2", "2.0.0")
class Accumulator[T] private[spark] ( class Accumulator[T] private[spark] (
// SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@transient private val initialValue: T, @transient private val initialValue: T,
...@@ -75,6 +69,7 @@ class Accumulator[T] private[spark] ( ...@@ -75,6 +69,7 @@ class Accumulator[T] private[spark] (
* *
* @tparam T type of value to accumulate * @tparam T type of value to accumulate
*/ */
@deprecated("use AccumulatorV2", "2.0.0")
trait AccumulatorParam[T] extends AccumulableParam[T, T] { trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = { def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2) addInPlace(t1, t2)
...@@ -82,6 +77,7 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] { ...@@ -82,6 +77,7 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
} }
@deprecated("use AccumulatorV2", "2.0.0")
object AccumulatorParam { object AccumulatorParam {
// The following implicit objects were in SparkContext before 1.2 and users had to // The following implicit objects were in SparkContext before 1.2 and users had to
...@@ -89,21 +85,25 @@ object AccumulatorParam { ...@@ -89,21 +85,25 @@ object AccumulatorParam {
// them automatically. However, as there are duplicate codes in SparkContext for backward // them automatically. However, as there are duplicate codes in SparkContext for backward
// compatibility, please update them accordingly if you modify the following implicit objects. // compatibility, please update them accordingly if you modify the following implicit objects.
@deprecated("use AccumulatorV2", "2.0.0")
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double): Double = 0.0 def zero(initialValue: Double): Double = 0.0
} }
@deprecated("use AccumulatorV2", "2.0.0")
implicit object IntAccumulatorParam extends AccumulatorParam[Int] { implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def addInPlace(t1: Int, t2: Int): Int = t1 + t2
def zero(initialValue: Int): Int = 0 def zero(initialValue: Int): Int = 0
} }
@deprecated("use AccumulatorV2", "2.0.0")
implicit object LongAccumulatorParam extends AccumulatorParam[Long] { implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
def addInPlace(t1: Long, t2: Long): Long = t1 + t2 def addInPlace(t1: Long, t2: Long): Long = t1 + t2
def zero(initialValue: Long): Long = 0L def zero(initialValue: Long): Long = 0L
} }
@deprecated("use AccumulatorV2", "2.0.0")
implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
def addInPlace(t1: Float, t2: Float): Float = t1 + t2 def addInPlace(t1: Float, t2: Float): Float = t1 + t2
def zero(initialValue: Float): Float = 0f def zero(initialValue: Float): Float = 0f
...@@ -112,6 +112,7 @@ object AccumulatorParam { ...@@ -112,6 +112,7 @@ object AccumulatorParam {
// Note: when merging values, this param just adopts the newer value. This is used only // Note: when merging values, this param just adopts the newer value. This is used only
// internally for things that shouldn't really be accumulated across tasks, like input // internally for things that shouldn't really be accumulated across tasks, like input
// read method, which should be the same across all tasks in the same stage. // read method, which should be the same across all tasks in the same stage.
@deprecated("use AccumulatorV2", "2.0.0")
private[spark] object StringAccumulatorParam extends AccumulatorParam[String] { private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
def addInPlace(t1: String, t2: String): String = t2 def addInPlace(t1: String, t2: String): String = t2
def zero(initialValue: String): String = "" def zero(initialValue: String): String = ""
...@@ -119,12 +120,14 @@ object AccumulatorParam { ...@@ -119,12 +120,14 @@ object AccumulatorParam {
// Note: this is expensive as it makes a copy of the list every time the caller adds an item. // Note: this is expensive as it makes a copy of the list every time the caller adds an item.
// A better way to use this is to first accumulate the values yourself then them all at once. // A better way to use this is to first accumulate the values yourself then them all at once.
@deprecated("use AccumulatorV2", "2.0.0")
private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] { private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2 def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T] def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
} }
// For the internal metric that records what blocks are updated in a particular task // For the internal metric that records what blocks are updated in a particular task
@deprecated("use AccumulatorV2", "2.0.0")
private[spark] object UpdatedBlockStatusesAccumulatorParam private[spark] object UpdatedBlockStatusesAccumulatorParam
extends ListAccumulatorParam[(BlockId, BlockStatus)] extends ListAccumulatorParam[(BlockId, BlockStatus)]
......
...@@ -1219,6 +1219,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -1219,6 +1219,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `+=` method. Only the driver can access the accumulator's `value`. * values to using the `+=` method. Only the driver can access the accumulator's `value`.
*/ */
@deprecated("use AccumulatorV2", "2.0.0")
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = { def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = {
val acc = new Accumulator(initialValue, param) val acc = new Accumulator(initialValue, param)
cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc)) cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
...@@ -1230,6 +1231,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -1230,6 +1231,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
* driver can access the accumulator's `value`. * driver can access the accumulator's `value`.
*/ */
@deprecated("use AccumulatorV2", "2.0.0")
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
: Accumulator[T] = { : Accumulator[T] = {
val acc = new Accumulator(initialValue, param, Some(name)) val acc = new Accumulator(initialValue, param, Some(name))
...@@ -1243,6 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -1243,6 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @tparam R accumulator result type * @tparam R accumulator result type
* @tparam T type that can be added to the accumulator * @tparam T type that can be added to the accumulator
*/ */
@deprecated("use AccumulatorV2", "2.0.0")
def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T])
: Accumulable[R, T] = { : Accumulable[R, T] = {
val acc = new Accumulable(initialValue, param) val acc = new Accumulable(initialValue, param)
...@@ -1257,6 +1260,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -1257,6 +1260,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @tparam R accumulator result type * @tparam R accumulator result type
* @tparam T type that can be added to the accumulator * @tparam T type that can be added to the accumulator
*/ */
@deprecated("use AccumulatorV2", "2.0.0")
def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T])
: Accumulable[R, T] = { : Accumulable[R, T] = {
val acc = new Accumulable(initialValue, param, Some(name)) val acc = new Accumulable(initialValue, param, Some(name))
...@@ -1270,6 +1274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli ...@@ -1270,6 +1274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
* standard mutable collections. So you can use this with mutable Map, Set, etc. * standard mutable collections. So you can use this with mutable Map, Set, etc.
*/ */
@deprecated("use AccumulatorV2", "2.0.0")
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
(initialValue: R): Accumulable[R, T] = { (initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R, T] val param = new GrowableAccumulableParam[R, T]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment