From 7fb574bf666661fdf8a786de779f85efe2f15f0c Mon Sep 17 00:00:00 2001
From: jerryshao <saisai.shao@intel.com>
Date: Fri, 28 Jun 2013 10:14:30 +0800
Subject: [PATCH] Code clean and remarshal

---
 .../scala/spark/deploy/master/Master.scala     |  9 +++++----
 .../deploy/master/MasterInstrumentation.scala  | 11 ++---------
 .../deploy/worker/WorkerInstrumentation.scala  | 18 ++++++------------
 .../scala/spark/metrics/MetricsSystem.scala    |  5 ++---
 .../scala/spark/metrics/sink/ConsoleSink.scala |  1 -
 .../scala/spark/metrics/sink/CsvSink.scala     |  1 -
 .../main/scala/spark/metrics/sink/Sink.scala   |  1 -
 7 files changed, 15 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index e44f5e3168..cc0b2d4295 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -78,6 +78,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
     
     Master.metricsSystem.registerSource(masterInstrumentation)
+    Master.metricsSystem.start()
   }
 
   override def postStop() {
@@ -321,22 +322,22 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       removeWorker(worker)
     }
   }
+  
+  override def postStop() {
+    Master.metricsSystem.stop()
+  }
 }
 
 private[spark] object Master {
   private val systemName = "sparkMaster"
   private val actorName = "Master"
   private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
-  
   private val metricsSystem = MetricsSystem.createMetricsSystem("master")
 
   def main(argStrings: Array[String]) {
     val args = new MasterArguments(argStrings)
     val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
-    
-    metricsSystem.start()
     actorSystem.awaitTermination()
-    metricsSystem.stop()
   }
 
   /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */
diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
index 46c90b94d2..61a561c955 100644
--- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
@@ -1,19 +1,12 @@
 package spark.deploy.master
 
-import java.util.{Map, HashMap => JHashMap} 
-
-import com.codahale.metrics.{Gauge, Metric}
-
-import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
+import com.codahale.metrics.{Gauge,MetricRegistry}
 
 import spark.metrics.source.Source
-import spark.Logging
 
 private[spark] class MasterInstrumentation(val master: Master) extends Source {
-  val className = classOf[Master].getName()
-  val instrumentationName = "master"
   val metricRegistry = new MetricRegistry()    
-  val sourceName = instrumentationName
+  val sourceName = "master"
 
   metricRegistry.register(MetricRegistry.name("workers","number"), 
        new Gauge[Int] {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
index 5ce29cf04c..94c20a98c1 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
@@ -1,44 +1,38 @@
 package spark.deploy.worker
 
-import com.codahale.metrics.{Gauge, Metric}
+import com.codahale.metrics.{Gauge, MetricRegistry}
 
-import java.util.{Map, HashMap => JHashMap}
-
-import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
 import spark.metrics.source.Source
 
 private[spark] class WorkerInstrumentation(val worker: Worker) extends Source {
-  val className = classOf[Worker].getName()
-  
   val sourceName = "worker"
-  
   val metricRegistry = new MetricRegistry()
 
-  metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"), 
+  metricRegistry.register(MetricRegistry.name("executor", "number"), 
     new Gauge[Int] {
       override def getValue: Int = worker.executors.size
   })
   
   // Gauge for cores used of this worker
-  metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"), 
+  metricRegistry.register(MetricRegistry.name("core_used", "number"), 
     new Gauge[Int] {
       override def getValue: Int = worker.coresUsed
   })
   
   // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"), 
+  metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"), 
     new Gauge[Int] {
       override def getValue: Int = worker.memoryUsed
   })
   
   // Gauge for cores free of this worker
-  metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"), 
+  metricRegistry.register(MetricRegistry.name("core_free", "number"), 
     new Gauge[Int] {
       override def getValue: Int = worker.coresFree
   })
   
   // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"), 
+  metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"), 
     new Gauge[Int] {
       override def getValue: Int = worker.memoryFree
     })
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index a23ccd2692..5bfdc00eaf 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -5,7 +5,6 @@ import scala.collection.mutable
 import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
 
 import java.util.Properties
-//import java.util._
 import java.util.concurrent.TimeUnit
 
 import spark.Logging
@@ -20,7 +19,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
   
   val sinks = new mutable.ArrayBuffer[Sink]
   val sources = new mutable.ArrayBuffer[Source]
-  var registry = new MetricRegistry()
+  val registry = new MetricRegistry()
   
   registerSources()
   registerSinks()
@@ -35,7 +34,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
   
   def registerSource(source: Source) {
     sources += source
-    registry.register(source.sourceName,source.metricRegistry)
+    registry.register(source.sourceName, source.metricRegistry)
   }
   
   def registerSources() {
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
index 9cd17556fa..e2e4197d1d 100644
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -23,7 +23,6 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend
       .convertRatesTo(TimeUnit.SECONDS)
       .build()
 
-  
   override def start() {
     reporter.start(pollPeriod, pollUnit)  
   }
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index 62e51be0dc..c2d645331c 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -30,7 +30,6 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
       .convertRatesTo(TimeUnit.SECONDS)
       .build(new File(pollDir))
 
-  
   override def start() {
     reporter.start(pollPeriod, pollUnit)  
   }
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
index 9fef894fde..3ffdcbdaba 100644
--- a/core/src/main/scala/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -2,6 +2,5 @@ package spark.metrics.sink
 
 trait Sink {
   def start: Unit
-  
   def stop: Unit
 }
\ No newline at end of file
-- 
GitLab