diff --git a/conf/metrics.properties b/conf/metrics.properties
index 78749cf3815ffd178ba5ab1b68030f23340e714c..0bbb6b5229ea21b1e292ffb8f17bfe9e73600b67 100644
--- a/conf/metrics.properties
+++ b/conf/metrics.properties
@@ -1,9 +1,11 @@
-# syntax: [prefix].[sink].[instance].[options]
+# syntax: [prefix].[sink|source].[instance].[options]
 
 *.sink.console.period=10
 
 *.sink.console.unit=second
 
+master.source.jvm.class=spark.metrics.source.JvmSource
+
 master.sink.console.period=10
 
 master.sink.console.unit=second
diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
index 13088189a4b90df4be88f7782a5e37316d194540..c295e725d7151db9d9c3eac3789b4dd5f3d6a55c 100644
--- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala
@@ -15,12 +15,15 @@ private[spark] trait MasterInstrumentation extends AbstractInstrumentation {
   def initialize(master: Master) {
     masterInst = Some(master)
     
+    // Register all the sources
+    registerSources()
+    
     // Register and start all the sinks
-    registerSinks
+    registerSinks()
   }
   
   def uninitialize() {
-    unregisterSinks
+    unregisterSinks()
   }
   
   // Gauge for worker numbers in cluster
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
index 04c43ce33b82f445191c3f0d1445c29eb5deba83..2f725300b5cbd1eb7f55c2cc05e10dae4b604612 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala
@@ -15,6 +15,9 @@ private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
   def initialize(worker: Worker) {
     workerInst = Some(worker)
     
+    // Register all the sources
+    registerSources()
+    
     // Register and start all the sinks
     registerSinks()
   }
@@ -36,7 +39,7 @@ private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
   })
   
   // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "Mbytes"), 
+  metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"), 
     new Gauge[Int] {
       override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
   })
diff --git a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala b/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
index 0fed6084888a9dae56bdf7030564a13c6412eeba..9cae1e0220a1bfb188e706c3db2fb060d3a08579 100644
--- a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
+++ b/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala
@@ -9,17 +9,39 @@ import java.util.concurrent.TimeUnit
 
 import spark.Logging
 import spark.metrics.sink._
+import spark.metrics.source._
 
-trait AbstractInstrumentation extends Logging {
+private [spark] trait AbstractInstrumentation extends Logging {
   initLogging()
   
+  // Get MetricRegistry handler
   def registryHandler: MetricRegistry
+  // Get the instance name
   def instance: String
   
   val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE)
   val metricsConfig = new MetricsConfig(confFile)
   
   val sinks = new mutable.ArrayBuffer[Sink]
+  val sources = new mutable.ArrayBuffer[Source]
+  
+  def registerSources() {
+    val instConfig = metricsConfig.getInstance(instance)
+    val sourceConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SOURCE_REGEX)
+    
+    // Register all the sources
+    sourceConfigs.foreach { kv =>
+      val classPath = kv._2.getProperty("class")
+      try {
+        val source = Class.forName(classPath).getConstructor(classOf[MetricRegistry])
+          .newInstance(registryHandler)
+        sources += source.asInstanceOf[Source]
+      } catch {
+        case e: Exception => logError("source class " + classPath + " cannot be instantialized", e)
+      }
+    }
+    sources.foreach(_.registerSource)
+  }
   
   def registerSinks() {
     val instConfig = metricsConfig.getInstance(instance)
@@ -33,6 +55,7 @@ trait AbstractInstrumentation extends Logging {
       val classPath = if (AbstractInstrumentation.DEFAULT_SINKS.contains(kv._1)) {
         AbstractInstrumentation.DEFAULT_SINKS(kv._1)
       } else {
+        // For non-default sink, a property class should be set and create using reflection
         kv._2.getProperty("class")
       }
       try {
@@ -40,10 +63,9 @@ trait AbstractInstrumentation extends Logging {
           .newInstance(kv._2, registryHandler)
         sinks += sink.asInstanceOf[Sink]
       } catch {
-        case e: Exception => logError("class " + classPath + "cannot be instantialize", e)
+        case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e)
       }
     }
-    
     sinks.foreach(_.registerSink)
   }
   
@@ -58,6 +80,7 @@ object AbstractInstrumentation {
       "csv" -> "spark.metrics.sink.CsvSink")
       
   val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
+  val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
   
   val timeUnits = Map(
       "millisecond" -> TimeUnit.MILLISECONDS,
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index 0fec1988ea39e6accc510b1962822221321e8cbc..be4f670918ae4d7ce72e2720f40f14dfcb28bce7 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -6,7 +6,7 @@ import java.io.FileInputStream
 import scala.collection.mutable
 import scala.util.matching.Regex
 
-class MetricsConfig(val configFile: String) {
+private [spark] class MetricsConfig(val configFile: String) {
   val properties = new Properties()
   var fis: FileInputStream = _
   
@@ -36,7 +36,7 @@ class MetricsConfig(val configFile: String) {
 }
 
 object MetricsConfig {
-  val DEFAULT_CONFIG_FILE = "/home/jerryshao/project/sotc_cloud-spark/conf/metrics.properties"
+  val DEFAULT_CONFIG_FILE = "conf/metrics.properties"
   val DEFAULT_PREFIX = "*"
   val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
   
@@ -45,9 +45,11 @@ object MetricsConfig {
     
     import scala.collection.JavaConversions._
     prop.foreach { kv => 
-      val regex(a, b) = kv._1
-      subProperties.getOrElseUpdate(a, new Properties).setProperty(b, kv._2)
-      println(">>>>>subProperties added  " + a + " " + b + " " + kv._2)
+      if (regex.findPrefixOf(kv._1) != None) {
+        val regex(prefix, suffix) = kv._1
+        subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
+        println(">>>>>subProperties added  " + prefix + " " + suffix + " " + kv._2)
+      }
     }
     
     subProperties
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index 3a80c36901441bd58dc80474701b9cd3ac26c6f2..1d663f6cffd0165c10a58fe04324e4a199211068 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -46,8 +46,8 @@ object CsvSink {
   val CSV_KEY_UNIT = "unit"
   val CSV_KEY_DIR = "directory"
     
-  val CSV_DEFAULT_PERIOD = "1"
-  val CSV_DEFAULT_UNIT = "minute"
+  val CSV_DEFAULT_PERIOD = "10"
+  val CSV_DEFAULT_UNIT = "second"
   val CSV_DEFAULT_DIR = "/tmp/"
 }
 
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
index 65ebcb4eac7b75ba7233faa0857be50fefd960e0..26052b7231b40dccac301e34a76c10a34c297f7e 100644
--- a/core/src/main/scala/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -2,5 +2,6 @@ package spark.metrics.sink
 
 trait Sink {
   def registerSink: Unit
+  
   def unregisterSink: Unit
 }
\ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
new file mode 100644
index 0000000000000000000000000000000000000000..8f6bf48843acc0843c81b6058b41c02bdcdead3e
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala
@@ -0,0 +1,17 @@
+package spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
+
+class JvmSource(registry: MetricRegistry) extends Source {
+  // Initialize memory usage gauge for jvm
+  val memUsageMetricSet = new MemoryUsageGaugeSet
+  
+  // Initialize garbage collection usage gauge for jvm
+  val gcMetricSet = new GarbageCollectorMetricSet
+  
+  override def registerSource() {
+    registry.registerAll(memUsageMetricSet)
+    registry.registerAll(gcMetricSet)
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
new file mode 100644
index 0000000000000000000000000000000000000000..35cfe0c8ff1bfdde4b52203e5b6912346e74f9ca
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/Source.scala
@@ -0,0 +1,5 @@
+package spark.metrics.source
+
+trait Source {
+  def registerSource: Unit
+}
\ No newline at end of file