diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
index 5a24042e14b9829c5be90c4db7d7a57f260a60dc..c87b66f047dc85566f46c957789989aaa06deb0f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
@@ -34,7 +34,7 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
     override def getValue: Long = application.duration
   })
 
-  metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("cores"), new Gauge[Int] {
     override def getValue: Int = application.coresGranted
   })
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
index 23d1cb77da6f2dd77f76ca15c0668fae965f9e91..36c1b87b7f684fb93cb2937a65d896332604fabf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -26,17 +26,17 @@ private[spark] class MasterSource(val master: Master) extends Source {
   val sourceName = "master"
 
   // Gauge for worker numbers in cluster
-  metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("workers"), new Gauge[Int] {
     override def getValue: Int = master.workers.size
   })
 
   // Gauge for application numbers in cluster
-  metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("apps"), new Gauge[Int] {
     override def getValue: Int = master.apps.size
   })
 
   // Gauge for waiting application numbers in cluster
-  metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] {
     override def getValue: Int = master.waitingApps.size
   })
 }
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
index df269fd04777f3a4364b8bd767fbebe7dca13a4b..b24e936b1ff8e2b5503ae86a7f71f4a6489b03d5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
@@ -25,27 +25,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
   val sourceName = "worker"
   val metricRegistry = new MetricRegistry()
 
-  metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("executors"), new Gauge[Int] {
     override def getValue: Int = worker.executors.size
   })
 
   // Gauge for cores used of this worker
-  metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("coresUsed"), new Gauge[Int] {
     override def getValue: Int = worker.coresUsed
   })
 
   // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("memUsed_MBytes"), new Gauge[Int] {
     override def getValue: Int = worker.memoryUsed
   })
 
   // Gauge for cores free of this worker
-  metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("coresFree"), new Gauge[Int] {
     override def getValue: Int = worker.coresFree
   })
 
   // Gauge for memory free of this worker
-  metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("memFree_MBytes"), new Gauge[Int] {
     override def getValue: Int = worker.memoryFree
   })
 }
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 18c9dc1c0a9bbb5e974c49fa14c6c1cbae18f097..34ed9c8f73c01cdb5f8c5cd14e32b1df6b010b35 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -43,31 +43,31 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
   val sourceName = "executor.%s".format(executorId)
 
   // Gauge for executor thread pool's actively executing task counts
-  metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getActiveCount()
   })
 
   // Gauge for executor thread pool's approximate total number of tasks that have been completed
-  metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "completeTasks"), new Gauge[Long] {
     override def getValue: Long = executor.threadPool.getCompletedTaskCount()
   })
 
   // Gauge for executor thread pool's current number of threads
-  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getPoolSize()
   })
 
   // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
-  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getMaximumPoolSize()
   })
 
   // Gauge for file system stats of this executor
   for (scheme <- Array("hdfs", "file")) {
-    registerFileSystemStat(scheme, "bytesRead", _.getBytesRead(), 0L)
-    registerFileSystemStat(scheme, "bytesWritten", _.getBytesWritten(), 0L)
-    registerFileSystemStat(scheme, "readOps", _.getReadOps(), 0)
-    registerFileSystemStat(scheme, "largeReadOps", _.getLargeReadOps(), 0)
-    registerFileSystemStat(scheme, "writeOps", _.getWriteOps(), 0)
+    registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)
+    registerFileSystemStat(scheme, "write_bytes", _.getBytesWritten(), 0L)
+    registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0)
+    registerFileSystemStat(scheme, "largeRead_ops", _.getLargeReadOps(), 0)
+    registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 446d490cc9dde851046d50d61279d5129dd9b818..151514896f6fa2289bbae8a878772a333e9f55ec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -27,23 +27,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar
   val metricRegistry = new MetricRegistry()
   val sourceName = "%s.DAGScheduler".format(sc.appName)
 
-  metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.failed.size
   })
 
-  metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.running.size
   })
 
-  metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.waiting.size
   })
 
-  metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.nextJobId.get()
   })
 
-  metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.activeJobs.size
   })
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index acc3951088a8de54c9c28dbf0fd2b71287970342..e5068d5587c03eee6f1f94014dcdc8ae610f956d 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -28,7 +28,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
   val metricRegistry = new MetricRegistry()
   val sourceName = "%s.BlockManager".format(sc.appName)
 
-  metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("memory", "maxMem_MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
       val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
@@ -36,7 +36,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
     }
   })
 
-  metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("memory", "remainingMem_MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
       val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
@@ -44,7 +44,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
     }
   })
 
-  metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("memory", "memUsed_MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
       val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
@@ -53,7 +53,7 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
     }
   })
 
-  metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed_MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
       val diskSpaceUsed = storageStatusList