diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
index c72322e9c2e156f99acf70fcf1c93291879e4020..f0b1f777fde557837ea416d69920ee8481554709 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
@@ -31,12 +31,13 @@ class ApplicationSource(val application: ApplicationInfo) extends Source {
     override def getValue: String = application.state.toString
   })
 
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), new Gauge[Long] {
-    override def getValue: Long = application.duration
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("runtime", "ms")), 
+    new Gauge[Long] { override def getValue: Long = application.duration })
 
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), new Gauge[Int] {
-    override def getValue: Int = application.coresGranted
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("cores", "number")), 
+    new Gauge[Int] { override def getValue: Int = application.coresGranted })
 
 }
+
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
index de3939836bf182c345b408001e66192464820f47..8a88fef330a6429605d5c83071f56d2f795adc07 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterSource.scala
@@ -27,17 +27,18 @@ private[spark] class MasterSource(val master: Master) extends Source {
   val sourceName = "master"
 
   // Gauge for worker numbers in cluster
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), new Gauge[Int] {
-    override def getValue: Int = master.workers.size
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("workers","number")), 
+    new Gauge[Int] { override def getValue: Int = master.workers.size })
 
   // Gauge for application numbers in cluster
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), new Gauge[Int] {
-    override def getValue: Int = master.apps.size
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("apps", "number")), 
+    new Gauge[Int] { override def getValue: Int = master.apps.size })
 
   // Gauge for waiting application numbers in cluster
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), new Gauge[Int] {
-    override def getValue: Int = master.waitingApps.size
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("waitingApps", "number")), 
+    new Gauge[Int] { override def getValue: Int = master.waitingApps.size })
 }
+
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
index fc4f4ae99c4c7cb264eb04078338971d0646851a..0596f14355f8dcdbce3b0b71392e31c88146c413 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerSource.scala
@@ -26,27 +26,28 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
   val sourceName = "worker"
   val metricRegistry = new MetricRegistry()
 
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), new Gauge[Int] {
-    override def getValue: Int = worker.executors.size
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("executors", "number")), 
+    new Gauge[Int] { override def getValue: Int = worker.executors.size })
 
   // Gauge for cores used of this worker
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), new Gauge[Int] {
-    override def getValue: Int = worker.coresUsed
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("coresUsed", "number")), 
+    new Gauge[Int] { override def getValue: Int = worker.coresUsed })
 
   // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Int] {
-    override def getValue: Int = worker.memoryUsed
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("memUsed", "MBytes")), 
+    new Gauge[Int] { override def getValue: Int = worker.memoryUsed })
 
   // Gauge for cores free of this worker
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), new Gauge[Int] {
-    override def getValue: Int = worker.coresFree
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("coresFree", "number")), 
+    new Gauge[Int] { override def getValue: Int = worker.coresFree })
 
   // Gauge for memory free of this worker
-  metricRegistry.register(MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), new Gauge[Int] {
-    override def getValue: Int = worker.memoryFree
-  })
+  metricRegistry.register(
+    MetricRegistry.name(NamingConventions.makeMetricName("memFree", "MBytes")), 
+    new Gauge[Int] { override def getValue: Int = worker.memoryFree })
 }
+
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index 6cbd15460332273db1e976a5bf8248a2d87f77b4..d063e4ad0b514cb8e268e6ade41824c7f3fdd332 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -44,31 +44,42 @@ class ExecutorSource(val executor: Executor, executorId: String) extends Source
   val sourceName = "executor.%s".format(executorId)
 
   // Gauge for executor thread pool's actively executing task counts
-  metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), new Gauge[Int] {
-    override def getValue: Int = executor.threadPool.getActiveCount()
-  })
+  metricRegistry.register(
+    MetricRegistry.name("threadpool", NamingConventions.makeMetricName("activeTask", "count")), 
+    new Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() })
 
   // Gauge for executor thread pool's approximate total number of tasks that have been completed
-  metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), new Gauge[Long] {
-    override def getValue: Long = executor.threadPool.getCompletedTaskCount()
-  })
+  metricRegistry.register(
+    MetricRegistry.name("threadpool", NamingConventions.makeMetricName("completeTask", "count")), 
+    new Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() })
 
   // Gauge for executor thread pool's current number of threads
-  metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), new Gauge[Int] {
-    override def getValue: Int = executor.threadPool.getPoolSize()
-  })
+  metricRegistry.register(
+    MetricRegistry.name("threadpool", NamingConventions.makeMetricName("currentPool", "size")), 
+    new Gauge[Int] { override def getValue: Int = executor.threadPool.getPoolSize() })
 
   // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
-  metricRegistry.register(MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), new Gauge[Int] {
-    override def getValue: Int = executor.threadPool.getMaximumPoolSize()
-  })
+  metricRegistry.register(
+    MetricRegistry.name("threadpool", NamingConventions.makeMetricName("maxPool", "size")), 
+    new Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() })
 
   // Gauge for file system stats of this executor
   for (scheme <- Array("hdfs", "file")) {
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "bytes"), _.getBytesRead(), 0L)
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "bytes"), _.getBytesWritten(), 0L)
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("read", "ops"), _.getReadOps(), 0)
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("largeRead", "ops"), _.getLargeReadOps(), 0)
-    registerFileSystemStat(scheme, NamingConventions.makeMetricName("write", "ops"), _.getWriteOps(), 0)
+    registerFileSystemStat(scheme, 
+      NamingConventions.makeMetricName("read", "bytes"),
+      _.getBytesRead(), 0L)
+    registerFileSystemStat(scheme, 
+      NamingConventions.makeMetricName("write", "bytes"), 
+      _.getBytesWritten(), 0L)
+    registerFileSystemStat(scheme, 
+      NamingConventions.makeMetricName("read", "ops"), 
+      _.getReadOps(), 0)
+    registerFileSystemStat(scheme, 
+      NamingConventions.makeMetricName("largeRead", "ops"), 
+      _.getLargeReadOps(), 0)
+    registerFileSystemStat(scheme, 
+      NamingConventions.makeMetricName("write", "ops"), 
+      _.getWriteOps(), 0)
   }
 }
+
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 9e90a08411d414ce661a563ba4f073a59c31e061..02fb80760f530d8d61411463d4679627759f4131 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -28,23 +28,24 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar
   val metricRegistry = new MetricRegistry()
   val sourceName = "%s.DAGScheduler".format(sc.appName)
 
-  metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), new Gauge[Int] {
-    override def getValue: Int = dagScheduler.failed.size
-  })
+  metricRegistry.register(
+    MetricRegistry.name("stage", NamingConventions.makeMetricName("failedStages", "number")), 
+    new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size })
 
-  metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), new Gauge[Int] {
-    override def getValue: Int = dagScheduler.running.size
-  })
+  metricRegistry.register(
+    MetricRegistry.name("stage", NamingConventions.makeMetricName("runningStages", "number")), 
+    new Gauge[Int] { override def getValue: Int = dagScheduler.running.size })
 
-  metricRegistry.register(MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), new Gauge[Int] {
-    override def getValue: Int = dagScheduler.waiting.size
-  })
+  metricRegistry.register(
+    MetricRegistry.name("stage", NamingConventions.makeMetricName("waitingStages", "number")), 
+    new Gauge[Int] { override def getValue: Int = dagScheduler.waiting.size })
 
-  metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), new Gauge[Int] {
-    override def getValue: Int = dagScheduler.nextJobId.get()
-  })
+  metricRegistry.register(
+    MetricRegistry.name("job", NamingConventions.makeMetricName("allJobs", "number")), 
+    new Gauge[Int] { override def getValue: Int = dagScheduler.nextJobId.get() })
 
-  metricRegistry.register(MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), new Gauge[Int] {
-    override def getValue: Int = dagScheduler.activeJobs.size
-  })
+  metricRegistry.register(
+    MetricRegistry.name("job", NamingConventions.makeMetricName("activeJobs", "number")), 
+    new Gauge[Int] { override def getValue: Int = dagScheduler.activeJobs.size })
 }
+
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 43122502401720e8136d44b0990ce0b684d606ae..fcf9da481b8a0757ad90e9a1eb2a05763763f698 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -29,40 +29,48 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: Spar
   val metricRegistry = new MetricRegistry()
   val sourceName = "%s.BlockManager".format(sc.appName)
 
-  metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
-      maxMem / 1024 / 1024
-    }
+  metricRegistry.register(
+    MetricRegistry.name("memory", NamingConventions.makeMetricName("maxMem", "MBytes")), 
+    new Gauge[Long] {
+      override def getValue: Long = {
+        val storageStatusList = blockManager.master.getStorageStatus
+        val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+        maxMem / 1024 / 1024
+      }
   })
 
-  metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
-      remainingMem / 1024 / 1024
-    }
+  metricRegistry.register(
+    MetricRegistry.name("memory", NamingConventions.makeMetricName("remainingMem", "MBytes")), 
+    new Gauge[Long] {
+      override def getValue: Long = {
+        val storageStatusList = blockManager.master.getStorageStatus
+        val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+        remainingMem / 1024 / 1024
+      }
   })
 
-  metricRegistry.register(MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
-      val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
-      (maxMem - remainingMem) / 1024 / 1024
-    }
+  metricRegistry.register(
+    MetricRegistry.name("memory", NamingConventions.makeMetricName("memUsed", "MBytes")), 
+    new Gauge[Long] {
+      override def getValue: Long = {
+        val storageStatusList = blockManager.master.getStorageStatus
+        val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+        val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+        (maxMem - remainingMem) / 1024 / 1024
+      }
   })
 
-  metricRegistry.register(MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] {
-    override def getValue: Long = {
-      val storageStatusList = blockManager.master.getStorageStatus
-      val diskSpaceUsed = storageStatusList
-      	.flatMap(_.blocks.values.map(_.diskSize))
-      	.reduceOption(_ + _)
-      	.getOrElse(0L)
-
-      diskSpaceUsed / 1024 / 1024
-    }
+  metricRegistry.register(
+    MetricRegistry.name("disk", NamingConventions.makeMetricName("diskSpaceUsed", "MBytes")), new Gauge[Long] {
+      override def getValue: Long = {
+        val storageStatusList = blockManager.master.getStorageStatus
+        val diskSpaceUsed = storageStatusList
+          .flatMap(_.blocks.values.map(_.diskSize))
+          .reduceOption(_ + _)
+          .getOrElse(0L)
+  
+        diskSpaceUsed / 1024 / 1024
+      }
   })
 }
+