diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index c7e24aa36cb800ad6cc446fa561368dcea1ea3bd..0486ca4c79213edb35f8f56a3123deecd289e029 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -1,11 +1,82 @@
 # syntax: [instance].[sink|source].[name].[options]
 
+#  "instance" specify "who" (the role) use metrics system. In spark there are
+#  several roles like master, worker, executor, driver, these roles will
+#  create metrics system for monitoring. So instance represents these roles.
+#  Currently in Spark, several instances have already implemented: master,
+#  worker, executor, driver.
+#
+#  [instance] field can be "master", "worker", "executor", "driver", which means
+#  only the specified instance has this property.
+#  a wild card "*" can be used to represent instance name, which means all the
+#  instances will have this property.
+#
+#  "source" specify "where" (source) to collect metrics data. In metrics system,
+#  there exists two kinds of source:
+#    1. Spark internal source, like MasterSource, WorkerSource, etc, which will
+#    collect Spark component's internal state, these sources are related to
+#    instance and will be added after specific metrics system is created.
+#    2. Common source, like JvmSource, which will collect low level state, is
+#    configured by configuration and loaded through reflection.
+#
+#  "sink" specify "where" (destination) to output metrics data to. Several sinks
+#  can be coexisted and flush metrics to all these sinks.
+#
+#  [sink|source] field specify this property is source related or sink, this
+#  field can only be source or sink.
+#
+#  [name] field specify the name of source or sink, this is custom defined.
+#
+#  [options] field is the specific property of this source or sink, this source
+#  or sink is responsible for parsing this property.
+#
+#  Notes:
+#    1. Sinks should be added through configuration, like console sink, class
+#    full name should be specified by class property.
+#    2. Some sinks can specify polling period, like console sink, which is 10 seconds,
+#    it should be attention minimal polling period is 1 seconds, any period
+#    below than 1s is illegal.
+#    3. Wild card property can be overlapped by specific instance property, for
+#    example, *.sink.console.period can be overlapped by master.sink.console.period.
+#    4. A metrics specific configuration
+#    "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
+#    added to Java property using -Dspark.metrics.conf=xxx if you want to
+#    customize metrics system, or you can put it in ${SPARK_HOME}/conf,
+#    metrics system will search and load it automatically.
+
+# Enable JmxSink for all instances by class name
+#*.sink.jmx.class=spark.metrics.sink.JmxSink
+
+# Enable ConsoleSink for all instances by class name
 #*.sink.console.class=spark.metrics.sink.ConsoleSink
 
+# Polling period for ConsoleSink
 #*.sink.console.period=10
 
-#*.sink.console.unit=second
+#*.sink.console.unit=seconds
+
+# Master instance overlap polling period
+#master.sink.console.period=15
+
+#master.sink.console.unit=seconds
+
+# Enable CsvSink for all instances
+#*.sink.csv.class=spark.metrics.sink.CsvSink
+
+# Polling period for CsvSink
+#*.sink.csv.period=1
+
+#*.sink.csv.unit=minutes
+
+# Polling directory for CsvSink
+#*.sink.csv.directory=/tmp/
+
+# Worker instance overlap polling period
+#worker.sink.csv.period=10
+
+#worker.sink.csv.unit=minutes
 
+# Enable jvm source for instance master, worker, driver and executor
 #master.source.jvm.class=spark.metrics.source.JvmSource
 
 #worker.source.jvm.class=spark.metrics.source.JvmSource
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 1d592206c0054181f44455305556bb53dfd7da23..9692af52953f8f30517ef6b1abaa0ec42a75686b 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -58,6 +58,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   Utils.checkHost(host, "Expected hostname")
 
+  val metricsSystem = MetricsSystem.createMetricsSystem("master")
   val masterSource = new MasterSource(this)
 
   val masterPublicAddress = {
@@ -77,12 +78,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     webUi.start()
     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
 
-    Master.metricsSystem.registerSource(masterSource)
-    Master.metricsSystem.start()
+    metricsSystem.registerSource(masterSource)
+    metricsSystem.start()
   }
 
   override def postStop() {
     webUi.stop()
+    metricsSystem.stop()
   }
 
   override def receive = {
@@ -322,17 +324,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
       removeWorker(worker)
     }
   }
-
-  override def postStop() {
-    Master.metricsSystem.stop()
-  }
 }
 
 private[spark] object Master {
   private val systemName = "sparkMaster"
   private val actorName = "Master"
   private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
-  private val metricsSystem = MetricsSystem.createMetricsSystem("master")
 
   def main(argStrings: Array[String]) {
     val args = new MasterArguments(argStrings)
diff --git a/core/src/main/scala/spark/deploy/master/MasterSource.scala b/core/src/main/scala/spark/deploy/master/MasterSource.scala
index 65c22320d600ae08ff9b0e8cb9454ae61025156e..b8cfa6a7736841cd582ae5a8f2aa2fe71506f578 100644
--- a/core/src/main/scala/spark/deploy/master/MasterSource.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterSource.scala
@@ -19,7 +19,7 @@ private[spark] class MasterSource(val master: Master) extends Source {
   })
 
   // Gauge for waiting application numbers in cluster
-  metricRegistry.register(MetricRegistry.name("waiting_apps", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
     override def getValue: Int = master.waitingApps.size
   })
 }
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 5c0f77fd7567b50176869f71bcc53e0384db1dbd..8fa0d12b828aa35b48505ae7c6856e8116b1e978 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -68,6 +68,7 @@ private[spark] class Worker(
   var coresUsed = 0
   var memoryUsed = 0
 
+  val metricsSystem = MetricsSystem.createMetricsSystem("worker")
   val workerSource = new WorkerSource(this)
 
   def coresFree: Int = cores - coresUsed
@@ -100,10 +101,9 @@ private[spark] class Worker(
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
     webUi.start()
     connectToMaster()
-    startWebUi()
 
-    Worker.metricsSystem.registerSource(workerSource)
-    Worker.metricsSystem.start()
+    metricsSystem.registerSource(workerSource)
+    metricsSystem.start()
   }
 
   def connectToMaster() {
@@ -185,14 +185,11 @@ private[spark] class Worker(
   override def postStop() {
     executors.values.foreach(_.kill())
     webUi.stop()
-
-    Worker.metricsSystem.stop()
+    metricsSystem.stop()
   }
 }
 
 private[spark] object Worker {
-  private val metricsSystem = MetricsSystem.createMetricsSystem("worker")
-
   def main(argStrings: Array[String]) {
     val args = new WorkerArguments(argStrings)
     val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
index 539eac71bdc2e3d83c38bcad4d2812a2e44aab2b..39cb8e56901a1f36178f2e1e3ba4d6fb490d74e0 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
@@ -8,27 +8,27 @@ private[spark] class WorkerSource(val worker: Worker) extends Source {
   val sourceName = "worker"
   val metricRegistry = new MetricRegistry()
 
-  metricRegistry.register(MetricRegistry.name("executor", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
     override def getValue: Int = worker.executors.size
   })
 
   // Gauge for cores used of this worker
-  metricRegistry.register(MetricRegistry.name("core_used", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
     override def getValue: Int = worker.coresUsed
   })
 
   // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
     override def getValue: Int = worker.memoryUsed
   })
 
   // Gauge for cores free of this worker
-  metricRegistry.register(MetricRegistry.name("core_free", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
     override def getValue: Int = worker.coresFree
   })
 
-  // Gauge for memory used of this worker
-  metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"), new Gauge[Int] {
+  // Gauge for memory free of this worker
+  metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
     override def getValue: Int = worker.memoryFree
   })
 }
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala
index d8b531cb58dc8754b5768d9a9c4b9ab37492873f..94116edfcf8841e978ce00e8ac6c13a1e0562c81 100644
--- a/core/src/main/scala/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/spark/executor/ExecutorSource.scala
@@ -9,22 +9,22 @@ class ExecutorSource(val executor: Executor) extends Source {
   val sourceName = "executor"
 
   // Gauge for executor thread pool's actively executing task counts
-  metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getActiveCount()
   })
 
   // Gauge for executor thread pool's approximate total number of tasks that have been completed
-  metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
     override def getValue: Long = executor.threadPool.getCompletedTaskCount()
   })
 
   // Gauge for executor thread pool's current number of threads
-  metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getPoolSize()
   })
 
   // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
-  metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
     override def getValue: Int = executor.threadPool.getMaximumPoolSize()
   })
 }
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index 5066b7ac22cbc0fee79b5fba1a70f0dff4ae783b..ed505b0aa73de2e7a7574553360f58f8607e36a2 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -1,44 +1,58 @@
 package spark.metrics
 
 import java.util.Properties
-import java.io.{File, FileInputStream}
+import java.io.{File, FileInputStream, InputStream, IOException}
 
 import scala.collection.mutable
 import scala.util.matching.Regex
 
-private[spark] class MetricsConfig(val configFile: String) {
-  val properties = new Properties()
+import spark.Logging
+
+private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
+  initLogging()
+
   val DEFAULT_PREFIX = "*"
   val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+  val METRICS_CONF = "metrics.properties"
+
+  val properties = new Properties()
   var propertyCategories: mutable.HashMap[String, Properties] = null
 
   private def setDefaultProperties(prop: Properties) {
-    prop.setProperty("*.sink.jmx.enabled", "default")
-    prop.setProperty("*.source.jvm.class", "spark.metrics.source.JvmSource")
+    // empty function, any default property can be set here
   }
 
-  def initilize() {
+  def initialize() {
     //Add default properties in case there's no properties file
     setDefaultProperties(properties)
 
-    val confFile = new File(configFile)
-    if (confFile.exists()) {
-      var fis: FileInputStream = null
-      try {
-        fis = new FileInputStream(configFile)
-        properties.load(fis)
-      } finally {
-        fis.close()
+    // If spark.metrics.conf is not set, try to get file in class path
+    var is: InputStream = null
+    try {
+      is = configFile match {
+        case Some(f) => new FileInputStream(f)
+        case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
+      }
+
+      if (is != null) {
+        properties.load(is)
       }
+    } catch {
+      case e: Exception => logError("Error loading configure file", e)
+    } finally {
+      if (is != null) is.close()
     }
 
     propertyCategories = subProperties(properties, INSTANCE_REGEX)
     if (propertyCategories.contains(DEFAULT_PREFIX)) {
       import scala.collection.JavaConversions._
+
       val defaultProperty = propertyCategories(DEFAULT_PREFIX)
-      for ((inst, prop) <- propertyCategories; p <- defaultProperty
-        if inst != DEFAULT_PREFIX; if prop.getProperty(p._1) == null) {
-        prop.setProperty(p._1, p._2)
+      for { (inst, prop) <- propertyCategories
+            if (inst != DEFAULT_PREFIX)
+            (k, v) <- defaultProperty
+            if (prop.getProperty(k) == null) } {
+        prop.setProperty(k, v)
       }
     }
   }
@@ -58,7 +72,7 @@ private[spark] class MetricsConfig(val configFile: String) {
   def getInstance(inst: String): Properties = {
     propertyCategories.get(inst) match {
       case Some(s) => s
-      case None => propertyCategories(DEFAULT_PREFIX)
+      case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
     }
   }
 }
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index 54f6c6e4dacb666c14c43258ca9269349d5e516f..2f87577ff335f56f4c12f508cc4701bb1db60fa6 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -11,19 +11,51 @@ import spark.Logging
 import spark.metrics.sink.Sink
 import spark.metrics.source.Source
 
+/**
+ * Spark Metrics System, created by specific "instance", combined by source,
+ * sink, periodically poll source metrics data to sink destinations.
+ *
+ * "instance" specify "who" (the role) use metrics system. In spark there are several roles
+ * like master, worker, executor, client driver, these roles will create metrics system
+ * for monitoring. So instance represents these roles. Currently in Spark, several instances
+ * have already implemented: master, worker, executor, driver.
+ *
+ * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
+ * two kinds of source:
+ *   1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
+ *   Spark component's internal state, these sources are related to instance and will be
+ *   added after specific metrics system is created.
+ *   2. Common source, like JvmSource, which will collect low level state, is configured by
+ *   configuration and loaded through reflection.
+ *
+ * "sink" specify "where" (destination) to output metrics data to. Several sinks can be
+ * coexisted and flush metrics to all these sinks.
+ *
+ * Metrics configuration format is like below:
+ * [instance].[sink|source].[name].[options] = xxxx
+ *
+ * [instance] can be "master", "worker", "executor", "driver", which means only the specified
+ * instance has this property.
+ * wild card "*" can be used to replace instance name, which means all the instances will have
+ * this property.
+ *
+ * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
+ *
+ * [name] specify the name of sink or source, it is custom defined.
+ *
+ * [options] is the specific property of this source or sink.
+ */
 private[spark] class MetricsSystem private (val instance: String) extends Logging {
   initLogging()
 
-  val confFile = System.getProperty("spark.metrics.conf.file", "unsupported")
-  val metricsConfig = new MetricsConfig(confFile)
+  val confFile = System.getProperty("spark.metrics.conf")
+  val metricsConfig = new MetricsConfig(Option(confFile))
 
   val sinks = new mutable.ArrayBuffer[Sink]
   val sources = new mutable.ArrayBuffer[Source]
   val registry = new MetricRegistry()
 
-  val DEFAULT_SINKS = Map("jmx" -> "spark.metrics.sink.JmxSink")
-
-  metricsConfig.initilize()
+  metricsConfig.initialize()
   registerSources()
   registerSinks()
 
@@ -37,7 +69,11 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
 
   def registerSource(source: Source) {
     sources += source
-    registry.register(source.sourceName, source.metricRegistry)
+    try {
+      registry.register(source.sourceName, source.metricRegistry)
+    } catch {
+      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
+    }
   }
 
   def registerSources() {
@@ -51,7 +87,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
         val source = Class.forName(classPath).newInstance()
         registerSource(source.asInstanceOf[Source])
       } catch {
-        case e: Exception => logError("source class " + classPath + " cannot be instantialized", e)
+        case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
       }
     }
   }
@@ -61,18 +97,14 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
     val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
 
     sinkConfigs.foreach { kv =>
-      val classPath = if (DEFAULT_SINKS.contains(kv._1)) {
-        DEFAULT_SINKS(kv._1)
-      } else {
-        // For non-default sink, a property class should be set and create using reflection
-        kv._2.getProperty("class")
-      }
+      val classPath = kv._2.getProperty("class")
       try {
-        val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry])
+        val sink = Class.forName(classPath)
+          .getConstructor(classOf[Properties], classOf[MetricRegistry])
           .newInstance(kv._2, registry)
         sinks += sink.asInstanceOf[Sink]
       } catch {
-        case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e)
+        case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
       }
     }
   }
@@ -81,12 +113,17 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
 private[spark] object MetricsSystem {
   val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
   val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
-  val timeUnits = Map(
-      "illisecond" -> TimeUnit.MILLISECONDS,
-      "second" -> TimeUnit.SECONDS,
-      "minute" -> TimeUnit.MINUTES,
-      "hour" -> TimeUnit.HOURS,
-      "day" -> TimeUnit.DAYS)
-
-   def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+
+  val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+  val MINIMAL_POLL_PERIOD = 1
+
+  def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
+    val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
+    if (period < MINIMAL_POLL_PERIOD) {
+      throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
+        " below than minimal polling period ")
+    }
+  }
+
+  def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
 }
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
index 437f24a5759cc03a48fffab391132afe25c8829b..eaaac5d153a2e305377481b3fb94352821968ac8 100644
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -8,22 +8,24 @@ import java.util.concurrent.TimeUnit
 import spark.metrics.MetricsSystem
 
 class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
-  val CONSOLE_DEFAULT_PERIOD = "10"
-  val CONSOLE_DEFAULT_UNIT = "second"
+  val CONSOLE_DEFAULT_PERIOD = 10
+  val CONSOLE_DEFAULT_UNIT = "SECONDS"
 
   val CONSOLE_KEY_PERIOD = "period"
   val CONSOLE_KEY_UNIT = "unit"
 
   val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
     case Some(s) => s.toInt
-    case None => CONSOLE_DEFAULT_PERIOD.toInt
+    case None => CONSOLE_DEFAULT_PERIOD
   }
 
   val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
-    case Some(s) => MetricsSystem.timeUnits(s)
-    case None => MetricsSystem.timeUnits(CONSOLE_DEFAULT_UNIT)
+    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+    case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
   }
 
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
   val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
       .convertDurationsTo(TimeUnit.MILLISECONDS)
       .convertRatesTo(TimeUnit.SECONDS)
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index a8ca819e875465001ee79f14805234c372e76a57..aa5bff0d34d0da22dbaed3ad7832c3fe5e990adb 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -13,19 +13,21 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
   val CSV_KEY_UNIT = "unit"
   val CSV_KEY_DIR = "directory"
 
-  val CSV_DEFAULT_PERIOD = "10"
-  val CSV_DEFAULT_UNIT = "second"
+  val CSV_DEFAULT_PERIOD = 10
+  val CSV_DEFAULT_UNIT = "SECONDS"
   val CSV_DEFAULT_DIR = "/tmp/"
 
   val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
     case Some(s) => s.toInt
-    case None => CSV_DEFAULT_PERIOD.toInt
+    case None => CSV_DEFAULT_PERIOD
   }
 
   val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
-    case Some(s) => MetricsSystem.timeUnits(s)
-    case None => MetricsSystem.timeUnits(CSV_DEFAULT_UNIT)
+    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+    case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
   }
+  
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
 
   val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
     case Some(s) => s
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
index 38158b8a2b95847c199b5c118f3fa5c0c720110d..87d27cc70d77fe4abd123ce2663e8897f4e1ab4a 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
@@ -8,23 +8,23 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends
   val metricRegistry = new MetricRegistry()
   val sourceName = "DAGScheduler"
 
-  metricRegistry.register(MetricRegistry.name("stage", "failedStage"), new  Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.failed.size
   })
 
-  metricRegistry.register(MetricRegistry.name("stage", "runningStage"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.running.size
   })
 
-  metricRegistry.register(MetricRegistry.name("stage", "waitingStage"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.waiting.size
   })
 
-  metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.nextRunId.get()
   })
 
-  metricRegistry.register(MetricRegistry.name("job", "ActiveJobs"), new Gauge[Int] {
+  metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
     override def getValue: Int = dagScheduler.activeJobs.size
   })
 }
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
index f964827102411e798291fd6f3a50da7b9c66dec0..4faa715c9475fce4035e02804ba4a075a5cf13aa 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala
@@ -9,27 +9,40 @@ private[spark] class BlockManagerSource(val blockManager: BlockManager) extends
   val metricRegistry = new MetricRegistry()
   val sourceName = "BlockManager"
 
-  metricRegistry.register(MetricRegistry.name("memory", "maxMem"), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
-      val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
-      maxMem
+      val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+      maxMem / 1024 / 1024
     }
   })
 
-  metricRegistry.register(MetricRegistry.name("memory", "remainingMem"), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
-      val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
-      remainingMem
+      val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+      remainingMem / 1024 / 1024
     }
   })
 
-  metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed"), new Gauge[Long] {
+  metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
     override def getValue: Long = {
       val storageStatusList = blockManager.master.getStorageStatus
-      val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).reduceOption(_+_).getOrElse(0L)
-      diskSpaceUsed
+      val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+      val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+      (maxMem - remainingMem) / 1024 / 1024
+    }
+  })
+
+  metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
+    override def getValue: Long = {
+      val storageStatusList = blockManager.master.getStorageStatus
+      val diskSpaceUsed = storageStatusList
+      	.flatMap(_.blocks.values.map(_.diskSize))
+      	.reduceOption(_ + _)
+      	.getOrElse(0L)
+
+      diskSpaceUsed / 1024 / 1024
     }
   })
 }
diff --git a/core/src/test/resources/test_metrics_config.properties b/core/src/test/resources/test_metrics_config.properties
index 201194000316c95024f5ca7c050e4298e144476d..2b31ddf2ebca651d7e2273c013e1e2dd9345b056 100644
--- a/core/src/test/resources/test_metrics_config.properties
+++ b/core/src/test/resources/test_metrics_config.properties
@@ -1,6 +1,6 @@
 *.sink.console.period = 10
-*.sink.console.unit = second
+*.sink.console.unit = seconds
 *.source.jvm.class = spark.metrics.source.JvmSource
 master.sink.console.period = 20
-master.sink.console.unit = minute
-  
+master.sink.console.unit = minutes
+
diff --git a/core/src/test/resources/test_metrics_system.properties b/core/src/test/resources/test_metrics_system.properties
index 06afbc662505a6a0942e7404da35534a640ca618..d5479f02980af9c96bc3202e109277a435ea69d4 100644
--- a/core/src/test/resources/test_metrics_system.properties
+++ b/core/src/test/resources/test_metrics_system.properties
@@ -1,7 +1,7 @@
 *.sink.console.period = 10
-*.sink.console.unit = second
+*.sink.console.unit = seconds
 test.sink.console.class = spark.metrics.sink.ConsoleSink
 test.sink.dummy.class = spark.metrics.sink.DummySink
 test.source.dummy.class = spark.metrics.source.DummySource
 test.sink.console.period = 20
-test.sink.console.unit = minute
+test.sink.console.unit = minutes
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
index bb1be4f4fc35b1e8f5fe422ff11d05e5f6f23f51..87cd2ffad25ba11b8a2c377276c5a516d70a3c5d 100644
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -15,40 +15,36 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("MetricsConfig with default properties") {
-    val conf = new MetricsConfig("dummy-file")
-    conf.initilize()
+    val conf = new MetricsConfig(Option("dummy-file"))
+    conf.initialize()
 
-    assert(conf.properties.size() === 2)
-    assert(conf.properties.getProperty("*.sink.jmx.enabled") === "default")
-    assert(conf.properties.getProperty("*.source.jvm.class") === "spark.metrics.source.JvmSource")
+    assert(conf.properties.size() === 0)
     assert(conf.properties.getProperty("test-for-dummy") === null)
 
     val property = conf.getInstance("random")
-    assert(property.size() === 2)
-    assert(property.getProperty("sink.jmx.enabled") === "default")
-    assert(property.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+    assert(property.size() === 0)
   }
 
   test("MetricsConfig with properties set") {
-    val conf = new MetricsConfig(filePath)
-    conf.initilize()
+    val conf = new MetricsConfig(Option(filePath))
+    conf.initialize()
 
     val masterProp = conf.getInstance("master")
-    assert(masterProp.size() === 4)
+    assert(masterProp.size() === 3)
     assert(masterProp.getProperty("sink.console.period") === "20")
-    assert(masterProp.getProperty("sink.console.unit") === "minute")
-    assert(masterProp.getProperty("sink.jmx.enabled") === "default")
-    assert(masterProp.getProperty("source.jvm.class") == "spark.metrics.source.JvmSource")
+    assert(masterProp.getProperty("sink.console.unit") === "minutes")
+    assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
 
     val workerProp = conf.getInstance("worker")
-    assert(workerProp.size() === 4)
+    assert(workerProp.size() === 3)
     assert(workerProp.getProperty("sink.console.period") === "10")
-    assert(workerProp.getProperty("sink.console.unit") === "second")
+    assert(workerProp.getProperty("sink.console.unit") === "seconds")
+    assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
   }
 
   test("MetricsConfig with subProperties") {
-    val conf = new MetricsConfig(filePath)
-    conf.initilize()
+    val conf = new MetricsConfig(Option(filePath))
+    conf.initialize()
 
     val propCategories = conf.propertyCategories
     assert(propCategories.size === 2)
@@ -59,14 +55,10 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
     assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
 
     val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
-    assert(sinkProps.size === 2)
+    assert(sinkProps.size === 1)
     assert(sinkProps.contains("console"))
-    assert(sinkProps.contains("jmx"))
 
     val consoleProps = sinkProps("console")
     assert(consoleProps.size() === 2)
-
-    val jmxProps = sinkProps("jmx")
-    assert(jmxProps.size() === 1)
   }
 }
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
index 462c28e8949b245e16460d15666fcf953384b15c..c1899964172fa235d4525e7958e3110794e20bb6 100644
--- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
@@ -12,7 +12,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
 
   before {
     filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
-    System.setProperty("spark.metrics.conf.file", filePath)
+    System.setProperty("spark.metrics.conf", filePath)
   }
 
   test("MetricsSystem with default config") {
@@ -20,9 +20,8 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
     val sources = metricsSystem.sources
     val sinks = metricsSystem.sinks
 
-    assert(sources.length === 1)
-    assert(sinks.length === 1)
-    assert(sources(0).sourceName === "jvm")
+    assert(sources.length === 0)
+    assert(sinks.length === 0)
   }
 
   test("MetricsSystem with sources add") {
@@ -30,11 +29,11 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
     val sources = metricsSystem.sources
     val sinks = metricsSystem.sinks
 
-    assert(sources.length === 1)
-    assert(sinks.length === 2)
+    assert(sources.length === 0)
+    assert(sinks.length === 1)
 
     val source = new spark.deploy.master.MasterSource(null)
     metricsSystem.registerSource(source)
-    assert(sources.length === 2)
+    assert(sources.length === 1)
   }
 }