From c3daad3f65630eb4ed536d06c0d467cde57a8142 Mon Sep 17 00:00:00 2001
From: jerryshao <saisai.shao@intel.com>
Date: Thu, 27 Jun 2013 12:00:19 +0800
Subject: [PATCH] Update metric source support for instrumentation

---
 conf/metrics.properties                       |  4 ++-
 .../deploy/master/MasterInstrumentation.scala |  7 +++--
 .../deploy/worker/WorkerInstrumentation.scala |  5 +++-
 .../metrics/AbstractInstrumentation.scala     | 29 +++++++++++++++++--
 .../scala/spark/metrics/MetricsConfig.scala   | 12 ++++----
 .../scala/spark/metrics/sink/CsvSink.scala    |  4 +--
 .../main/scala/spark/metrics/sink/Sink.scala  |  1 +
 .../spark/metrics/source/JvmSource.scala      | 17 +++++++++++
 .../scala/spark/metrics/source/Source.scala   |  5 ++++
 9 files changed, 70 insertions(+), 14 deletions(-)
 create mode 100644 core/src/main/scala/spark/metrics/source/JvmSource.scala
 create mode 100644 core/src/main/scala/spark/metrics/source/Source.scala

diff --git a/conf/metrics.properties b/conf/metrics.properties
index 78749cf381..0bbb6b5229 100644
--- a/conf/metrics.properties
+++ b/conf/metrics.properties
@@ -1,9 +1,11 @@
-# syntax: [prefix].[sink].[instance].[options]
+# syntax: [prefix].[sink|source].[instance].[options]
 
 *.sink.console.period=10
 
 *.sink.console.unit=second
 
+master.source.jvm.class=spark.metrics.source.JvmSource
+
 master.sink.console.period=10
 
 master.sink.console.unit=second
diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
index 13088189a4..c295e725d7 100644
--- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
@@ -15,12 +15,15 @@ private[spark] trait MasterInstrumentation extends AbstractInstrumentation {
   def initialize(master: Master) {
     masterInst = Some(master)
     
+    // Register all the sources
+    registerSources()
+    
     // Register and start all the sinks
-    registerSinks
+    registerSinks()
   }
   
   def uninitialize() {
-    unregisterSinks
+    unregisterSinks()
   }
   
   // Gauge for worker numbers in cluster
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
index 04c43ce33b..2f725300b5 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
@@ -15,6 +15,9 @@ private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
   def initialize(worker: Worker) {
     workerInst = Some(worker)
     
+    // Register all the sources
+    registerSources()
+    
     // Register and start all the sinks
     registerSinks()
   }
@@ -36,7 +39,7 @@ private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
   })
   
   // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "Mbytes"), 
+  metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"), 
     new Gauge[Int] {
       override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
   })
diff --git a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala b/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
index 0fed608488..9cae1e0220 100644
--- a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
+++ b/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
@@ -9,17 +9,39 @@ import java.util.concurrent.TimeUnit
 
 import spark.Logging
 import spark.metrics.sink._
+import spark.metrics.source._
 
-trait AbstractInstrumentation extends Logging {
+private [spark] trait AbstractInstrumentation extends Logging {
   initLogging()
   
+  // Get MetricRegistry handler
   def registryHandler: MetricRegistry
+  // Get the instance name
   def instance: String
   
   val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE)
   val metricsConfig = new MetricsConfig(confFile)
   
   val sinks = new mutable.ArrayBuffer[Sink]
+  val sources = new mutable.ArrayBuffer[Source]
+  
+  def registerSources() {
+    val instConfig = metricsConfig.getInstance(instance)
+    val sourceConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SOURCE_REGEX)
+    
+    // Register all the sources
+    sourceConfigs.foreach { kv =>
+      val classPath = kv._2.getProperty("class")
+      try {
+        val source = Class.forName(classPath).getConstructor(classOf[MetricRegistry])
+          .newInstance(registryHandler)
+        sources += source.asInstanceOf[Source]
+      } catch {
+        case e: Exception => logError("source class " + classPath + " cannot be instantialized", e)
+      }
+    }
+    sources.foreach(_.registerSource)
+  }
   
   def registerSinks() {
     val instConfig = metricsConfig.getInstance(instance)
@@ -33,6 +55,7 @@ trait AbstractInstrumentation extends Logging {
       val classPath = if (AbstractInstrumentation.DEFAULT_SINKS.contains(kv._1)) {
         AbstractInstrumentation.DEFAULT_SINKS(kv._1)
       } else {
+        // For non-default sink, a property class should be set and create using reflection
         kv._2.getProperty("class")
       }
       try {
@@ -40,10 +63,9 @@ trait AbstractInstrumentation extends Logging {
           .newInstance(kv._2, registryHandler)
         sinks += sink.asInstanceOf[Sink]
       } catch {
-        case e: Exception => logError("class " + classPath + "cannot be instantialize", e)
+        case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e)
       }
     }
-    
     sinks.foreach(_.registerSink)
   }
   
@@ -58,6 +80,7 @@ object AbstractInstrumentation {
       "csv" -> "spark.metrics.sink.CsvSink")
       
   val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
+  val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
   
   val timeUnits = Map(
       "millisecond" -> TimeUnit.MILLISECONDS,
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index 0fec1988ea..be4f670918 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -6,7 +6,7 @@ import java.io.FileInputStream
 import scala.collection.mutable
 import scala.util.matching.Regex
 
-class MetricsConfig(val configFile: String) {
+private [spark] class MetricsConfig(val configFile: String) {
   val properties = new Properties()
   var fis: FileInputStream = _
   
@@ -36,7 +36,7 @@ class MetricsConfig(val configFile: String) {
 }
 
 object MetricsConfig {
-  val DEFAULT_CONFIG_FILE = "/home/jerryshao/project/sotc_cloud-spark/conf/metrics.properties"
+  val DEFAULT_CONFIG_FILE = "conf/metrics.properties"
   val DEFAULT_PREFIX = "*"
   val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
   
@@ -45,9 +45,11 @@ object MetricsConfig {
     
     import scala.collection.JavaConversions._
     prop.foreach { kv => 
-      val regex(a, b) = kv._1
-      subProperties.getOrElseUpdate(a, new Properties).setProperty(b, kv._2)
-      println(">>>>>subProperties added  " + a + " " + b + " " + kv._2)
+      if (regex.findPrefixOf(kv._1) != None) {
+        val regex(prefix, suffix) = kv._1
+        subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
+        println(">>>>>subProperties added  " + prefix + " " + suffix + " " + kv._2)
+      }
     }
     
     subProperties
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index 3a80c36901..1d663f6cff 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -46,8 +46,8 @@ object CsvSink {
   val CSV_KEY_UNIT = "unit"
   val CSV_KEY_DIR = "directory"
     
-  val CSV_DEFAULT_PERIOD = "1"
-  val CSV_DEFAULT_UNIT = "minute"
+  val CSV_DEFAULT_PERIOD = "10"
+  val CSV_DEFAULT_UNIT = "second"
   val CSV_DEFAULT_DIR = "/tmp/"
 }
 
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
index 65ebcb4eac..26052b7231 100644
--- a/core/src/main/scala/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -2,5 +2,6 @@ package spark.metrics.sink
 
 trait Sink {
   def registerSink: Unit
+  
   def unregisterSink: Unit
 }
\ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
new file mode 100644
index 0000000000..8f6bf48843
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala
@@ -0,0 +1,17 @@
+package spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
+
+class JvmSource(registry: MetricRegistry) extends Source {
+  // Initialize memory usage gauge for jvm
+  val memUsageMetricSet = new MemoryUsageGaugeSet
+  
+  // Initialize garbage collection usage gauge for jvm
+  val gcMetricSet = new GarbageCollectorMetricSet
+  
+  override def registerSource() {
+    registry.registerAll(memUsageMetricSet)
+    registry.registerAll(gcMetricSet)
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
new file mode 100644
index 0000000000..35cfe0c8ff
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/Source.scala
@@ -0,0 +1,5 @@
+package spark.metrics.source
+
+trait Source {
+  def registerSource: Unit
+}
\ No newline at end of file
-- 
GitLab