diff --git a/core/src/main/scala/org/apache/spark/Accumulable.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala
index c76720c4bb8b22e0bea077c3403958019c4851bc..799c7e4fd50068cbdbf0479616ea029aa17b4491 100644
--- a/core/src/main/scala/org/apache/spark/Accumulable.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulable.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark
 
-import java.io.{ObjectInputStream, Serializable}
+import java.io.Serializable
 
 import scala.collection.generic.Growable
 import scala.reflect.ClassTag
 
 import org.apache.spark.scheduler.AccumulableInfo
 import org.apache.spark.serializer.JavaSerializer
-import org.apache.spark.util.Utils
 
 
 /**
@@ -49,6 +48,7 @@ import org.apache.spark.util.Utils
  * @tparam R the full accumulated data (result type)
  * @tparam T partial data that can be added in
  */
+@deprecated("use AccumulatorV2", "2.0.0")
 class Accumulable[R, T] private (
     val id: Long,
     // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
@@ -162,6 +162,7 @@ class Accumulable[R, T] private (
  * @tparam R the full accumulated data (result type)
  * @tparam T partial data that can be added in
  */
+@deprecated("use AccumulatorV2", "2.0.0")
 trait AccumulableParam[R, T] extends Serializable {
   /**
    * Add additional data to the accumulator value. Is allowed to modify and return `r`
@@ -191,6 +192,7 @@ trait AccumulableParam[R, T] extends Serializable {
 }
 
 
+@deprecated("use AccumulatorV2", "2.0.0")
 private[spark] class
 GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
   extends AccumulableParam[R, T] {
diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala
index 9b007b97760883e14f4680255bd0872535c507d6..e52d36b7b564b003a4e25667089e86c22fffc495 100644
--- a/core/src/main/scala/org/apache/spark/Accumulator.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulator.scala
@@ -17,13 +17,6 @@
 
 package org.apache.spark
 
-import java.util.concurrent.atomic.AtomicLong
-import javax.annotation.concurrent.GuardedBy
-
-import scala.collection.mutable
-import scala.ref.WeakReference
-
-import org.apache.spark.internal.Logging
 import org.apache.spark.storage.{BlockId, BlockStatus}
 
 
@@ -58,7 +51,8 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
  * @param name human-readable name associated with this accumulator
  * @param countFailedValues whether to accumulate values from failed tasks
  * @tparam T result type
- */
+*/
+@deprecated("use AccumulatorV2", "2.0.0")
 class Accumulator[T] private[spark] (
     // SI-8813: This must explicitly be a private val, or else scala 2.11 doesn't compile
     @transient private val initialValue: T,
@@ -75,6 +69,7 @@ class Accumulator[T] private[spark] (
  *
  * @tparam T type of value to accumulate
  */
+@deprecated("use AccumulatorV2", "2.0.0")
 trait AccumulatorParam[T] extends AccumulableParam[T, T] {
   def addAccumulator(t1: T, t2: T): T = {
     addInPlace(t1, t2)
@@ -82,6 +77,7 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
 }
 
 
+@deprecated("use AccumulatorV2", "2.0.0")
 object AccumulatorParam {
 
   // The following implicit objects were in SparkContext before 1.2 and users had to
@@ -89,21 +85,25 @@ object AccumulatorParam {
   // them automatically. However, as there are duplicate codes in SparkContext for backward
   // compatibility, please update them accordingly if you modify the following implicit objects.
 
+  @deprecated("use AccumulatorV2", "2.0.0")
   implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
     def addInPlace(t1: Double, t2: Double): Double = t1 + t2
     def zero(initialValue: Double): Double = 0.0
   }
 
+  @deprecated("use AccumulatorV2", "2.0.0")
   implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
     def addInPlace(t1: Int, t2: Int): Int = t1 + t2
     def zero(initialValue: Int): Int = 0
   }
 
+  @deprecated("use AccumulatorV2", "2.0.0")
   implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
     def addInPlace(t1: Long, t2: Long): Long = t1 + t2
     def zero(initialValue: Long): Long = 0L
   }
 
+  @deprecated("use AccumulatorV2", "2.0.0")
   implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
     def addInPlace(t1: Float, t2: Float): Float = t1 + t2
     def zero(initialValue: Float): Float = 0f
@@ -112,6 +112,7 @@ object AccumulatorParam {
   // Note: when merging values, this param just adopts the newer value. This is used only
   // internally for things that shouldn't really be accumulated across tasks, like input
   // read method, which should be the same across all tasks in the same stage.
+  @deprecated("use AccumulatorV2", "2.0.0")
   private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
     def addInPlace(t1: String, t2: String): String = t2
     def zero(initialValue: String): String = ""
@@ -119,12 +120,14 @@ object AccumulatorParam {
 
   // Note: this is expensive as it makes a copy of the list every time the caller adds an item.
   // A better way to use this is to first accumulate the values yourself then them all at once.
+  @deprecated("use AccumulatorV2", "2.0.0")
   private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
     def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
     def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
   }
 
   // For the internal metric that records what blocks are updated in a particular task
+  @deprecated("use AccumulatorV2", "2.0.0")
   private[spark] object UpdatedBlockStatusesAccumulatorParam
     extends ListAccumulatorParam[(BlockId, BlockStatus)]
 
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d0f88d4e4d10968ba1302a07da841844f1854cb6..302dec25c66bd0fad6d4f681bdb6a294e4d2bd50 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1219,6 +1219,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
    * values to using the `+=` method. Only the driver can access the accumulator's `value`.
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = {
     val acc = new Accumulator(initialValue, param)
     cleaner.foreach(_.registerAccumulatorForCleanup(acc.newAcc))
@@ -1230,6 +1231,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
    * driver can access the accumulator's `value`.
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T])
     : Accumulator[T] = {
     val acc = new Accumulator(initialValue, param, Some(name))
@@ -1243,6 +1245,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @tparam R accumulator result type
    * @tparam T type that can be added to the accumulator
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T])
     : Accumulable[R, T] = {
     val acc = new Accumulable(initialValue, param)
@@ -1257,6 +1260,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * @tparam R accumulator result type
    * @tparam T type that can be added to the accumulator
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T])
     : Accumulable[R, T] = {
     val acc = new Accumulable(initialValue, param, Some(name))
@@ -1270,6 +1274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
    * standard mutable collections. So you can use this with mutable Map, Set, etc.
    */
+  @deprecated("use AccumulatorV2", "2.0.0")
   def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T]
       (initialValue: R): Accumulable[R, T] = {
     val param = new GrowableAccumulableParam[R, T]