diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index dfd2f818acdac23c92f1e2a3c9f467c5dafca3aa..a3ce3d1ccc5e3410e6e933bb03754110a7d3ce7e 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -251,13 +251,10 @@ class TaskMetrics private[spark] () extends Serializable {
 
   private[spark] def accumulators(): Seq[AccumulatorV2[_, _]] = internalAccums ++ externalAccums
 
-  /**
-   * Looks for a registered accumulator by accumulator name.
-   */
-  private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = {
-    accumulators.find { acc =>
-      acc.name.isDefined && acc.name.get == name
-    }
+  private[spark] def nonZeroInternalAccums(): Seq[AccumulatorV2[_, _]] = {
+    // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its
+    // value will be updated at driver side.
+    internalAccums.filter(a => !a.isZero || a == _resultSize)
   }
 }
 
@@ -308,16 +305,16 @@ private[spark] object TaskMetrics extends Logging {
    */
   def fromAccumulators(accums: Seq[AccumulatorV2[_, _]]): TaskMetrics = {
     val tm = new TaskMetrics
-    val (internalAccums, externalAccums) =
-      accums.partition(a => a.name.isDefined && tm.nameToAccums.contains(a.name.get))
-
-    internalAccums.foreach { acc =>
-      val tmAcc = tm.nameToAccums(acc.name.get).asInstanceOf[AccumulatorV2[Any, Any]]
-      tmAcc.metadata = acc.metadata
-      tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
+    for (acc <- accums) {
+      val name = acc.name
+      if (name.isDefined && tm.nameToAccums.contains(name.get)) {
+        val tmAcc = tm.nameToAccums(name.get).asInstanceOf[AccumulatorV2[Any, Any]]
+        tmAcc.metadata = acc.metadata
+        tmAcc.merge(acc.asInstanceOf[AccumulatorV2[Any, Any]])
+      } else {
+        tm.externalAccums += acc
+      }
     }
-
-    tm.externalAccums ++= externalAccums
     tm
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index 7fd2918960cd03a18cdedc214a6d9ac9132e8159..5c337b992c840fc7f575bce3b69c353e7abe7297 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -182,14 +182,11 @@ private[spark] abstract class Task[T](
    */
   def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = {
     if (context != null) {
-      context.taskMetrics.internalAccums.filter { a =>
-        // RESULT_SIZE accumulator is always zero at executor, we need to send it back as its
-        // value will be updated at driver side.
-        // Note: internal accumulators representing task metrics always count failed values
-        !a.isZero || a.name == Some(InternalAccumulator.RESULT_SIZE)
-      // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not filter
-      // them out.
-      } ++ context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
+      // Note: internal accumulators representing task metrics always count failed values
+      context.taskMetrics.nonZeroInternalAccums() ++
+        // zero value external accumulators may still be useful, e.g. SQLMetrics, we should not
+        // filter them out.
+        context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
     } else {
       Seq.empty
     }
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 7479de55140ead8ba9ac7e219cfe9095d1f99c35..a65ec75cc5db6e4b58a6ddb35244cc224dcd823a 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -84,8 +84,12 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
    * Returns the name of this accumulator, can only be called after registration.
    */
   final def name: Option[String] = {
-    assertMetadataNotNull()
-    metadata.name
+    if (atDriverSide) {
+      AccumulatorContext.get(id).flatMap(_.metadata.name)
+    } else {
+      assertMetadataNotNull()
+      metadata.name
+    }
   }
 
   /**
@@ -161,7 +165,15 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
       }
       val copyAcc = copyAndReset()
       assert(copyAcc.isZero, "copyAndReset must return a zero value copy")
-      copyAcc.metadata = metadata
+      val isInternalAcc =
+        (name.isDefined && name.get.startsWith(InternalAccumulator.METRICS_PREFIX)) ||
+          getClass.getSimpleName == "SQLMetric"
+      if (isInternalAcc) {
+        // Do not serialize the name of internal accumulator and send it to executor.
+        copyAcc.metadata = metadata.copy(name = None)
+      } else {
+        copyAcc.metadata = metadata
+      }
       copyAcc
     } else {
       this
@@ -263,16 +275,6 @@ private[spark] object AccumulatorContext {
     originals.clear()
   }
 
-  /**
-   * Looks for a registered accumulator by accumulator name.
-   */
-  private[spark] def lookForAccumulatorByName(name: String): Option[AccumulatorV2[_, _]] = {
-    originals.values().asScala.find { ref =>
-      val acc = ref.get
-      acc != null && acc.name.isDefined && acc.name.get == name
-    }.map(_.get)
-  }
-
   // Identifier for distinguishing SQL metrics from other accumulators
   private[spark] val SQL_ACCUM_IDENTIFIER = "sql"
 }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index 8f576daa77d15828f9c879c1c3d9cf02c89ab662..b22da565d86e78c649b9a84d18c786ac72a5974f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -198,7 +198,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
     sc = new SparkContext("local", "test")
     // Create a dummy task. We won't end up running this; we just want to collect
     // accumulator updates from it.
-    val taskMetrics = TaskMetrics.empty
+    val taskMetrics = TaskMetrics.registered
     val task = new Task[Int](0, 0, 0) {
       context = new TaskContextImpl(0, 0, 0L, 0,
         new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 93964a2d5674335cd05b652d9fe039eb8e618bc9..48be3be81755af54fcf15f5d7cff56b2cb18c167 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -293,7 +293,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
     val execId = "exe-1"
 
     def makeTaskMetrics(base: Int): TaskMetrics = {
-      val taskMetrics = TaskMetrics.empty
+      val taskMetrics = TaskMetrics.registered
       val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
       val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics
       val inputMetrics = taskMetrics.inputMetrics
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a64dbeae472946b7562aaa4c9ea9219c84bde69a..a77c8e3cab4e8900fbadff7c11b3276c2e810f08 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -830,7 +830,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       hasHadoopInput: Boolean,
       hasOutput: Boolean,
       hasRecords: Boolean = true) = {
-    val t = TaskMetrics.empty
+    val t = TaskMetrics.registered
     // Set CPU times same as wall times for testing purpose
     t.setExecutorDeserializeTime(a)
     t.setExecutorDeserializeCpuTime(a)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index eb97118872ea1d53d5973661416181e0c800e13e..0bab321a657d647a55987f32aba95250eb994a0c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -153,14 +153,14 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
     }
 
     // For test purpose.
-    // If the predefined accumulator exists, the row group number to read will be updated
-    // to the accumulator. So we can check if the row groups are filtered or not in test case.
+    // If the last external accumulator is `NumRowGroupsAccumulator`, the row group number to read
+    // will be updated to the accumulator. So we can check if the row groups are filtered or not
+    // in test case.
     TaskContext taskContext = TaskContext$.MODULE$.get();
     if (taskContext != null) {
-      Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics()
-        .lookForAccumulatorByName("numRowGroups");
-      if (accu.isDefined()) {
-        ((LongAccumulator)accu.get()).add((long)blocks.size());
+      Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics().externalAccums().lastOption();
+      if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
+        ((AccumulatorV2<Integer, Integer>)accu.get()).add(blocks.size());
       }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 9a3328fcecee8c2b973a851973933e0b911e76d0..dd53b561326f39bf2e889ce7141d96424f0c9b9c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
-import org.apache.spark.util.{AccumulatorContext, LongAccumulator}
+import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
 
 /**
  * A test suite that tests Parquet filter2 API based filter pushdown optimization.
@@ -499,18 +499,20 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
         val path = s"${dir.getCanonicalPath}/table"
         (1 to 1024).map(i => (101, i)).toDF("a", "b").write.parquet(path)
 
-        Seq(("true", (x: Long) => x == 0), ("false", (x: Long) => x > 0)).map { case (push, func) =>
-          withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> push) {
-            val accu = new LongAccumulator
-            accu.register(sparkContext, Some("numRowGroups"))
+        Seq(true, false).foreach { enablePushDown =>
+          withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> enablePushDown.toString) {
+            val accu = new NumRowGroupsAcc
+            sparkContext.register(accu)
 
             val df = spark.read.parquet(path).filter("a < 100")
             df.foreachPartition(_.foreach(v => accu.add(0)))
             df.collect
 
-            val numRowGroups = AccumulatorContext.lookForAccumulatorByName("numRowGroups")
-            assert(numRowGroups.isDefined)
-            assert(func(numRowGroups.get.asInstanceOf[LongAccumulator].value))
+            if (enablePushDown) {
+              assert(accu.value == 0)
+            } else {
+              assert(accu.value > 0)
+            }
             AccumulatorContext.remove(accu.id)
           }
         }
@@ -537,3 +539,27 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     }
   }
 }
+
+class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
+  private var _sum = 0
+
+  override def isZero: Boolean = _sum == 0
+
+  override def copy(): AccumulatorV2[Integer, Integer] = {
+    val acc = new NumRowGroupsAcc()
+    acc._sum = _sum
+    acc
+  }
+
+  override def reset(): Unit = _sum = 0
+
+  override def add(v: Integer): Unit = _sum += v
+
+  override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other match {
+    case a: NumRowGroupsAcc => _sum += a._sum
+    case _ => throw new UnsupportedOperationException(
+      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
+  }
+
+  override def value: Integer = _sum
+}