diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
index 6d30d3c76a9fbf23879d38e120656d2722892697..83e11c5e236d46f945417ca58ae2f09874fec9d2 100644
--- a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala
@@ -81,35 +81,9 @@ class InputMetrics private (
    */
   def readMethod: DataReadMethod.Value = DataReadMethod.withName(_readMethod.localValue)
 
-  // Once incBytesRead & intRecordsRead is ready to be removed from the public API
-  // we can remove the internal versions and make the previous public API private.
-  // This has been done to suppress warnings when building.
-  @deprecated("incrementing input metrics is for internal use only", "2.0.0")
-  def incBytesRead(v: Long): Unit = _bytesRead.add(v)
-  private[spark] def incBytesReadInternal(v: Long): Unit = _bytesRead.add(v)
-  @deprecated("incrementing input metrics is for internal use only", "2.0.0")
-  def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
-  private[spark] def incRecordsReadInternal(v: Long): Unit = _recordsRead.add(v)
+  private[spark] def incBytesRead(v: Long): Unit = _bytesRead.add(v)
+  private[spark] def incRecordsRead(v: Long): Unit = _recordsRead.add(v)
   private[spark] def setBytesRead(v: Long): Unit = _bytesRead.setValue(v)
-  private[spark] def setReadMethod(v: DataReadMethod.Value): Unit =
-    _readMethod.setValue(v.toString)
+  private[spark] def setReadMethod(v: DataReadMethod.Value): Unit = _readMethod.setValue(v.toString)
 
 }
-
-/**
- * Deprecated methods to preserve case class matching behavior before Spark 2.0.
- */
-object InputMetrics {
-
-  @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
-  def apply(readMethod: DataReadMethod.Value): InputMetrics = {
-    val im = new InputMetrics
-    im.setReadMethod(readMethod)
-    im
-  }
-
-  @deprecated("matching on InputMetrics will not be supported in the future", "2.0.0")
-  def unapply(input: InputMetrics): Option[DataReadMethod.Value] = {
-    Some(input.readMethod)
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
index 0b37d559c746277478f0d8a07f2d65cb6c6639fb..93f953846fe26aae349d9e500e6f6f5700bdd7c7 100644
--- a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala
@@ -51,18 +51,6 @@ class OutputMetrics private (
       TaskMetrics.getAccum[String](accumMap, InternalAccumulator.output.WRITE_METHOD))
   }
 
-  /**
-   * Create a new [[OutputMetrics]] that is not associated with any particular task.
-   *
-   * This is only used for preserving matching behavior on [[OutputMetrics]], which used to be
-   * a case class before Spark 2.0. Once we remove support for matching on [[OutputMetrics]]
-   * we can remove this constructor as well.
-   */
-  private[executor] def this() {
-    this(InternalAccumulator.createOutputAccums()
-      .map { a => (a.name.get, a) }.toMap[String, Accumulator[_]])
-  }
-
   /**
    * Total number of bytes written.
    */
@@ -84,21 +72,3 @@ class OutputMetrics private (
     _writeMethod.setValue(v.toString)
 
 }
-
-/**
- * Deprecated methods to preserve case class matching behavior before Spark 2.0.
- */
-object OutputMetrics {
-
-  @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
-  def apply(writeMethod: DataWriteMethod.Value): OutputMetrics = {
-    val om = new OutputMetrics
-    om.setWriteMethod(writeMethod)
-    om
-  }
-
-  @deprecated("matching on OutputMetrics will not be supported in the future", "2.0.0")
-  def unapply(output: OutputMetrics): Option[DataWriteMethod.Value] = {
-    Some(output.writeMethod)
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
index 50bb645d974a358198854b8112c475fdb85b5f1d..71a24770b50ae1af619f5a1c01f382f91f638eab 100644
--- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala
@@ -116,4 +116,25 @@ class ShuffleReadMetrics private (
   private[spark] def setFetchWaitTime(v: Long): Unit = _fetchWaitTime.setValue(v)
   private[spark] def setRecordsRead(v: Long): Unit = _recordsRead.setValue(v)
 
+  /**
+   * Resets the value of the current metrics (`this`) and and merges all the independent
+   * [[ShuffleReadMetrics]] into `this`.
+   */
+  private[spark] def setMergeValues(metrics: Seq[ShuffleReadMetrics]): Unit = {
+    _remoteBlocksFetched.setValue(_remoteBlocksFetched.zero)
+    _localBlocksFetched.setValue(_localBlocksFetched.zero)
+    _remoteBytesRead.setValue(_remoteBytesRead.zero)
+    _localBytesRead.setValue(_localBytesRead.zero)
+    _fetchWaitTime.setValue(_fetchWaitTime.zero)
+    _recordsRead.setValue(_recordsRead.zero)
+    metrics.foreach { metric =>
+      _remoteBlocksFetched.add(metric.remoteBlocksFetched)
+      _localBlocksFetched.add(metric.localBlocksFetched)
+      _remoteBytesRead.add(metric.remoteBytesRead)
+      _localBytesRead.add(metric.localBytesRead)
+      _fetchWaitTime.add(metric.fetchWaitTime)
+      _recordsRead.add(metric.recordsRead)
+    }
+  }
+
 }
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index 02219a84abfd9d7107d9dda759ec6881f8057a8b..bda2a91d9d2cab3fd12282a0b3a56403eae64235 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -139,16 +139,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
    */
   def updatedBlockStatuses: Seq[(BlockId, BlockStatus)] = _updatedBlockStatuses.localValue
 
-  @deprecated("use updatedBlockStatuses instead", "2.0.0")
-  def updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = {
-    if (updatedBlockStatuses.nonEmpty) Some(updatedBlockStatuses) else None
-  }
-
-  @deprecated("setting updated blocks is not allowed", "2.0.0")
-  def updatedBlocks_=(blocks: Option[Seq[(BlockId, BlockStatus)]]): Unit = {
-    blocks.foreach(setUpdatedBlockStatuses)
-  }
-
   // Setters and increment-ers
   private[spark] def setExecutorDeserializeTime(v: Long): Unit =
     _executorDeserializeTime.setValue(v)
@@ -225,11 +215,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
    */
   def outputMetrics: Option[OutputMetrics] = _outputMetrics
 
-  @deprecated("setting OutputMetrics is for internal use only", "2.0.0")
-  def outputMetrics_=(om: Option[OutputMetrics]): Unit = {
-    _outputMetrics = om
-  }
-
   /**
    * Get or create a new [[OutputMetrics]] associated with this task.
    */
@@ -285,12 +270,7 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
   private[spark] def mergeShuffleReadMetrics(): Unit = synchronized {
     if (tempShuffleReadMetrics.nonEmpty) {
       val metrics = new ShuffleReadMetrics(initialAccumsMap)
-      metrics.setRemoteBlocksFetched(tempShuffleReadMetrics.map(_.remoteBlocksFetched).sum)
-      metrics.setLocalBlocksFetched(tempShuffleReadMetrics.map(_.localBlocksFetched).sum)
-      metrics.setFetchWaitTime(tempShuffleReadMetrics.map(_.fetchWaitTime).sum)
-      metrics.setRemoteBytesRead(tempShuffleReadMetrics.map(_.remoteBytesRead).sum)
-      metrics.setLocalBytesRead(tempShuffleReadMetrics.map(_.localBytesRead).sum)
-      metrics.setRecordsRead(tempShuffleReadMetrics.map(_.recordsRead).sum)
+      metrics.setMergeValues(tempShuffleReadMetrics)
       _shuffleReadMetrics = Some(metrics)
     }
   }
@@ -306,11 +286,6 @@ class TaskMetrics private[spark] (initialAccums: Seq[Accumulator[_]]) extends Se
    */
   def shuffleWriteMetrics: Option[ShuffleWriteMetrics] = _shuffleWriteMetrics
 
-  @deprecated("setting ShuffleWriteMetrics is for internal use only", "2.0.0")
-  def shuffleWriteMetrics_=(swm: Option[ShuffleWriteMetrics]): Unit = {
-    _shuffleWriteMetrics = swm
-  }
-
   /**
    * Get or create a new [[ShuffleWriteMetrics]] associated with this task.
    */
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f7c646c66892edf8f473718e4e9a31a3a5b82a48..35d190b464ff4c8bfe90c28dd3829a8984dea90d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -259,7 +259,7 @@ class HadoopRDD[K, V](
             finished = true
         }
         if (!finished) {
-          inputMetrics.incRecordsReadInternal(1)
+          inputMetrics.incRecordsRead(1)
         }
         if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
           updateBytesRead()
@@ -291,7 +291,7 @@ class HadoopRDD[K, V](
             // If we can't get the bytes read from the FS stats, fall back to the split size,
             // which may be inaccurate.
             try {
-              inputMetrics.incBytesReadInternal(split.inputSplit.value.getLength)
+              inputMetrics.incBytesRead(split.inputSplit.value.getLength)
             } catch {
               case e: java.io.IOException =>
                 logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index fb9606ae388d6fbc02a2824f36e0a544b668883c..3ccd616cbfd5744da06d44c726dd68f44bcc2b18 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -189,7 +189,7 @@ class NewHadoopRDD[K, V](
         }
         havePair = false
         if (!finished) {
-          inputMetrics.incRecordsReadInternal(1)
+          inputMetrics.incRecordsRead(1)
         }
         if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
           updateBytesRead()
@@ -220,7 +220,7 @@ class NewHadoopRDD[K, V](
             // If we can't get the bytes read from the FS stats, fall back to the split size,
             // which may be inaccurate.
             try {
-              inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
+              inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength)
             } catch {
               case e: java.io.IOException =>
                 logWarning("Unable to get input size to set InputMetrics for task", e)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 032939b49a708086ff69e11641f345d747c9b273..36ff3bcaaec629979731ba295ff2ee9c51249525 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -333,10 +333,10 @@ abstract class RDD[T: ClassTag](
       case Left(blockResult) =>
         if (readCachedBlock) {
           val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
-          existingMetrics.incBytesReadInternal(blockResult.bytes)
+          existingMetrics.incBytesRead(blockResult.bytes)
           new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
             override def next(): T = {
-              existingMetrics.incRecordsReadInternal(1)
+              existingMetrics.incRecordsRead(1)
               delegate.next()
             }
           }
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 3b78458065acb1fa14e02cef559380e93523633f..558767e36f7da32ad7a9ba773c012748eca4b07d 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -813,8 +813,8 @@ private[spark] object JsonProtocol {
     Utils.jsonOption(json \ "Input Metrics").foreach { inJson =>
       val readMethod = DataReadMethod.withName((inJson \ "Data Read Method").extract[String])
       val inputMetrics = metrics.registerInputMetrics(readMethod)
-      inputMetrics.incBytesReadInternal((inJson \ "Bytes Read").extract[Long])
-      inputMetrics.incRecordsReadInternal((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
+      inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
+      inputMetrics.incRecordsRead((inJson \ "Records Read").extractOpt[Long].getOrElse(0L))
     }
 
     // Updated blocks
diff --git a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
index 088b05403c1aa258f301fd9ed97687e534de3f79..d91f50f18f43153ba180d73a5de5705dc458ddb0 100644
--- a/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/TaskMetricsSuite.scala
@@ -285,8 +285,8 @@ class TaskMetricsSuite extends SparkFunSuite {
     // set and increment values
     in.setBytesRead(1L)
     in.setBytesRead(2L)
-    in.incRecordsReadInternal(1L)
-    in.incRecordsReadInternal(2L)
+    in.incRecordsRead(1L)
+    in.incRecordsRead(2L)
     in.setReadMethod(DataReadMethod.Disk)
     // assert new values exist
     assertValEquals(_.bytesRead, BYTES_READ, 2L)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 6a2d4c9f2cecb0c113bd3cd8c6a69495506021e2..de6f408fa82bef0dd7005d42d8d47d833f9032bc 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -853,7 +853,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
     if (hasHadoopInput) {
       val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
       inputMetrics.setBytesRead(d + e + f)
-      inputMetrics.incRecordsReadInternal(if (hasRecords) (d + e + f) / 100 else -1)
+      inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
     } else {
       val sr = t.registerTempShuffleReadMetrics()
       sr.incRemoteBytesRead(b + d)
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 313bf93b5d7602375fb72523a06995f5deb35525..71f337ce1f63e9debc67f904212bbaa0b6e794b1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -627,7 +627,10 @@ object MimaExcludes {
         ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.DistributedLDAModel.this")
       ) ++ Seq(
         // [SPARK-14475] Propagate user-defined context from driver to executors
-        ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty")
+        ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty"),
+        // [SPARK-14617] Remove deprecated APIs in TaskMetrics
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.InputMetrics$"),
+        ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$")
       )
     case v if v.startsWith("1.6") =>
       Seq(