diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
new file mode 100644
index 0000000000000000000000000000000000000000..0486ca4c79213edb35f8f56a3123deecd289e029
--- /dev/null
+++ b/conf/metrics.properties.template
@@ -0,0 +1,87 @@
+# syntax: [instance].[sink|source].[name].[options]
+
+#  "instance" specify "who" (the role) use metrics system. In spark there are
+#  several roles like master, worker, executor, driver, these roles will
+#  create metrics system for monitoring. So instance represents these roles.
+#  Currently in Spark, several instances have already implemented: master,
+#  worker, executor, driver.
+#
+#  [instance] field can be "master", "worker", "executor", "driver", which means
+#  only the specified instance has this property.
+#  a wild card "*" can be used to represent instance name, which means all the
+#  instances will have this property.
+#
+#  "source" specify "where" (source) to collect metrics data. In metrics system,
+#  there exists two kinds of source:
+#    1. Spark internal source, like MasterSource, WorkerSource, etc, which will
+#    collect Spark component's internal state, these sources are related to
+#    instance and will be added after specific metrics system is created.
+#    2. Common source, like JvmSource, which will collect low level state, is
+#    configured by configuration and loaded through reflection.
+#
+#  "sink" specify "where" (destination) to output metrics data to. Several sinks
+#  can be coexisted and flush metrics to all these sinks.
+#
+#  [sink|source] field specify this property is source related or sink, this
+#  field can only be source or sink.
+#
+#  [name] field specify the name of source or sink, this is custom defined.
+#
+#  [options] field is the specific property of this source or sink, this source
+#  or sink is responsible for parsing this property.
+#
+#  Notes:
+#    1. Sinks should be added through configuration, like console sink, class
+#    full name should be specified by class property.
+#    2. Some sinks can specify polling period, like console sink, which is 10 seconds,
+#    it should be attention minimal polling period is 1 seconds, any period
+#    below than 1s is illegal.
+#    3. Wild card property can be overlapped by specific instance property, for
+#    example, *.sink.console.period can be overlapped by master.sink.console.period.
+#    4. A metrics specific configuration
+#    "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
+#    added to Java property using -Dspark.metrics.conf=xxx if you want to
+#    customize metrics system, or you can put it in ${SPARK_HOME}/conf,
+#    metrics system will search and load it automatically.
+
+# Enable JmxSink for all instances by class name
+#*.sink.jmx.class=spark.metrics.sink.JmxSink
+
+# Enable ConsoleSink for all instances by class name
+#*.sink.console.class=spark.metrics.sink.ConsoleSink
+
+# Polling period for ConsoleSink
+#*.sink.console.period=10
+
+#*.sink.console.unit=seconds
+
+# Master instance overlap polling period
+#master.sink.console.period=15
+
+#master.sink.console.unit=seconds
+
+# Enable CsvSink for all instances
+#*.sink.csv.class=spark.metrics.sink.CsvSink
+
+# Polling period for CsvSink
+#*.sink.csv.period=1
+
+#*.sink.csv.unit=minutes
+
+# Polling directory for CsvSink
+#*.sink.csv.directory=/tmp/
+
+# Worker instance overlap polling period
+#worker.sink.csv.period=10
+
+#worker.sink.csv.unit=minutes
+
+# Enable jvm source for instance master, worker, driver and executor
+#master.source.jvm.class=spark.metrics.source.JvmSource
+
+#worker.source.jvm.class=spark.metrics.source.JvmSource
+
+#driver.source.jvm.class=spark.metrics.source.JvmSource
+
+#executor.source.jvm.class=spark.metrics.source.JvmSource
+
diff --git a/core/pom.xml b/core/pom.xml
index 6329b2fbd8809dcda49e3d3a38a460661239bf53..237d988712750124ec93858b5b625019fe6e996d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -108,6 +108,14 @@
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.codahale.metrics</groupId>
+      <artifactId>metrics-jvm</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.derby</groupId>
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 24ba60564665df6ada98d4154c5d706eab6be126..77cb0ee0cd6b06d1cc286023030e059381bb7594 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -60,13 +60,14 @@ import org.apache.mesos.MesosNativeLibrary
 import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
 import spark.partial.{ApproximateEvaluator, PartialResult}
 import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
+import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
 import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
 import spark.scheduler.local.LocalScheduler
 import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
+import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
 import spark.util.{MetadataCleaner, TimeStampedHashMap}
 import ui.{SparkUI}
+import spark.metrics._
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -270,6 +271,16 @@ class SparkContext(
   // Post init
   taskScheduler.postStartHook()
 
+  val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
+  val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
+
+  def initDriverMetrics() {
+    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
+    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
+  }
+
+  initDriverMetrics()
+
   // Methods for creating RDDs
 
   /** Distribute a local Scala collection to form an RDD. */
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index f2bdc11bdb2a81bf3a76e148e1fe255f3353c24a..4a1d341f5d26b5f60f67c0b015d102a77923e908 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -24,6 +24,7 @@ import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
 import akka.remote.RemoteActorRefProvider
 
 import spark.broadcast.BroadcastManager
+import spark.metrics.MetricsSystem
 import spark.storage.BlockManager
 import spark.storage.BlockManagerMaster
 import spark.network.ConnectionManager
@@ -53,6 +54,7 @@ class SparkEnv (
     val connectionManager: ConnectionManager,
     val httpFileServer: HttpFileServer,
     val sparkFilesDir: String,
+    val metricsSystem: MetricsSystem,
     // To be set only as part of initialization of SparkContext.
     // (executorId, defaultHostPort) => executorHostPort
     // If executorId is NOT found, return defaultHostPort
@@ -68,6 +70,7 @@ class SparkEnv (
     broadcastManager.stop()
     blockManager.stop()
     blockManager.master.stop()
+    metricsSystem.stop()
     actorSystem.shutdown()
     // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
     // down, but let's call it anyway in case it gets fixed in a later release
@@ -184,6 +187,13 @@ object SparkEnv extends Logging {
     httpFileServer.initialize()
     System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
 
+    val metricsSystem = if (isDriver) {
+      MetricsSystem.createMetricsSystem("driver")
+    } else {
+      MetricsSystem.createMetricsSystem("executor")
+    }
+    metricsSystem.start()
+
     // Set the sparkFiles directory, used when downloading dependencies.  In local mode,
     // this is a temporary directory; in distributed mode, this is the executor's current working
     // directory.
@@ -213,6 +223,7 @@ object SparkEnv extends Logging {
       connectionManager,
       httpFileServer,
       sparkFilesDir,
+      metricsSystem,
       None)
   }
 }
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index eddcafd84d190bc7df37e126bd18efab0f3c5550..9692af52953f8f30517ef6b1abaa0ec42a75686b 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -29,6 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 
 import spark.deploy._
 import spark.{Logging, SparkException, Utils}
+import spark.metrics.MetricsSystem
 import spark.util.AkkaUtils
 import ui.MasterWebUI
 
@@ -57,6 +58,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
 
   Utils.checkHost(host, "Expected hostname")
 
+  val metricsSystem = MetricsSystem.createMetricsSystem("master")
+  val masterSource = new MasterSource(this)
+
   val masterPublicAddress = {
     val envVar = System.getenv("SPARK_PUBLIC_DNS")
     if (envVar != null) envVar else host
@@ -73,10 +77,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
     webUi.start()
     context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
+
+    metricsSystem.registerSource(masterSource)
+    metricsSystem.start()
   }
 
   override def postStop() {
     webUi.stop()
+    metricsSystem.stop()
   }
 
   override def receive = {
diff --git a/core/src/main/scala/spark/deploy/master/MasterSource.scala b/core/src/main/scala/spark/deploy/master/MasterSource.scala
new file mode 100644
index 0000000000000000000000000000000000000000..b8cfa6a7736841cd582ae5a8f2aa2fe71506f578
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/MasterSource.scala
@@ -0,0 +1,25 @@
+package spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class MasterSource(val master: Master) extends Source {
+  val metricRegistry = new MetricRegistry()
+  val sourceName = "master"
+
+  // Gauge for worker numbers in cluster
+  metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
+    override def getValue: Int = master.workers.size
+  })
+
+  // Gauge for application numbers in cluster
+  metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
+    override def getValue: Int = master.apps.size
+  })
+
+  // Gauge for waiting application numbers in cluster
+  metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
+    override def getValue: Int = master.waitingApps.size
+  })
+}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 0bd88ea2537d8cecdc1f147a6b5c345b1cf6ce1a..8fa0d12b828aa35b48505ae7c6856e8116b1e978 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -23,6 +23,7 @@ import akka.util.duration._
 import spark.{Logging, Utils}
 import spark.util.AkkaUtils
 import spark.deploy._
+import spark.metrics.MetricsSystem
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
 import java.text.SimpleDateFormat
 import java.util.Date
@@ -67,6 +68,9 @@ private[spark] class Worker(
   var coresUsed = 0
   var memoryUsed = 0
 
+  val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+  val workerSource = new WorkerSource(this)
+
   def coresFree: Int = cores - coresUsed
   def memoryFree: Int = memory - memoryUsed
 
@@ -97,6 +101,9 @@ private[spark] class Worker(
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
     webUi.start()
     connectToMaster()
+
+    metricsSystem.registerSource(workerSource)
+    metricsSystem.start()
   }
 
   def connectToMaster() {
@@ -155,10 +162,10 @@ private[spark] class Worker(
 
     case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
       masterDisconnected()
-      
+
     case RequestWorkerState => {
       sender ! WorkerState(host, port, workerId, executors.values.toList,
-        finishedExecutors.values.toList, masterUrl, cores, memory, 
+        finishedExecutors.values.toList, masterUrl, cores, memory,
         coresUsed, memoryUsed, masterWebUiUrl)
     }
   }
@@ -178,6 +185,7 @@ private[spark] class Worker(
   override def postStop() {
     executors.values.foreach(_.kill())
     webUi.stop()
+    metricsSystem.stop()
   }
 }
 
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerSource.scala b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
new file mode 100644
index 0000000000000000000000000000000000000000..39cb8e56901a1f36178f2e1e3ba4d6fb490d74e0
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/WorkerSource.scala
@@ -0,0 +1,34 @@
+package spark.deploy.worker
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class WorkerSource(val worker: Worker) extends Source {
+  val sourceName = "worker"
+  val metricRegistry = new MetricRegistry()
+
+  metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
+    override def getValue: Int = worker.executors.size
+  })
+
+  // Gauge for cores used of this worker
+  metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
+    override def getValue: Int = worker.coresUsed
+  })
+
+  // Gauge for memory used of this worker
+  metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
+    override def getValue: Int = worker.memoryUsed
+  })
+
+  // Gauge for cores free of this worker
+  metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
+    override def getValue: Int = worker.coresFree
+  })
+
+  // Gauge for memory free of this worker
+  metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
+    override def getValue: Int = worker.memoryFree
+  })
+}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 2e81151882837e92b4e97131b6c9e5d6ab5661e1..8a74a8d853afd8abec3cc314c8415abdcc30a113 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -69,7 +69,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
       override def uncaughtException(thread: Thread, exception: Throwable) {
         try {
           logError("Uncaught exception in thread " + thread, exception)
-          
+
           // We may have been called from a shutdown hook. If so, we must not call System.exit().
           // (If we do, we will deadlock.)
           if (!Utils.inShutdown()) {
@@ -87,9 +87,13 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
     }
   )
 
+  val executorSource = new ExecutorSource(this)
+
   // Initialize Spark environment (using system properties read above)
   val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
   SparkEnv.set(env)
+  env.metricsSystem.registerSource(executorSource)
+
   private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
 
   // Start worker thread pool
diff --git a/core/src/main/scala/spark/executor/ExecutorSource.scala b/core/src/main/scala/spark/executor/ExecutorSource.scala
new file mode 100644
index 0000000000000000000000000000000000000000..94116edfcf8841e978ce00e8ac6c13a1e0562c81
--- /dev/null
+++ b/core/src/main/scala/spark/executor/ExecutorSource.scala
@@ -0,0 +1,30 @@
+package spark.executor
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+class ExecutorSource(val executor: Executor) extends Source {
+  val metricRegistry = new MetricRegistry()
+  val sourceName = "executor"
+
+  // Gauge for executor thread pool's actively executing task counts
+  metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
+    override def getValue: Int = executor.threadPool.getActiveCount()
+  })
+
+  // Gauge for executor thread pool's approximate total number of tasks that have been completed
+  metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
+    override def getValue: Long = executor.threadPool.getCompletedTaskCount()
+  })
+
+  // Gauge for executor thread pool's current number of threads
+  metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
+    override def getValue: Int = executor.threadPool.getPoolSize()
+  })
+
+  // Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
+  metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
+    override def getValue: Int = executor.threadPool.getMaximumPoolSize()
+  })
+}
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
new file mode 100644
index 0000000000000000000000000000000000000000..ed505b0aa73de2e7a7574553360f58f8607e36a2
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -0,0 +1,79 @@
+package spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileInputStream, InputStream, IOException}
+
+import scala.collection.mutable
+import scala.util.matching.Regex
+
+import spark.Logging
+
+private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
+  initLogging()
+
+  val DEFAULT_PREFIX = "*"
+  val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
+  val METRICS_CONF = "metrics.properties"
+
+  val properties = new Properties()
+  var propertyCategories: mutable.HashMap[String, Properties] = null
+
+  private def setDefaultProperties(prop: Properties) {
+    // empty function, any default property can be set here
+  }
+
+  def initialize() {
+    //Add default properties in case there's no properties file
+    setDefaultProperties(properties)
+
+    // If spark.metrics.conf is not set, try to get file in class path
+    var is: InputStream = null
+    try {
+      is = configFile match {
+        case Some(f) => new FileInputStream(f)
+        case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
+      }
+
+      if (is != null) {
+        properties.load(is)
+      }
+    } catch {
+      case e: Exception => logError("Error loading configure file", e)
+    } finally {
+      if (is != null) is.close()
+    }
+
+    propertyCategories = subProperties(properties, INSTANCE_REGEX)
+    if (propertyCategories.contains(DEFAULT_PREFIX)) {
+      import scala.collection.JavaConversions._
+
+      val defaultProperty = propertyCategories(DEFAULT_PREFIX)
+      for { (inst, prop) <- propertyCategories
+            if (inst != DEFAULT_PREFIX)
+            (k, v) <- defaultProperty
+            if (prop.getProperty(k) == null) } {
+        prop.setProperty(k, v)
+      }
+    }
+  }
+
+  def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
+    val subProperties = new mutable.HashMap[String, Properties]
+    import scala.collection.JavaConversions._
+    prop.foreach { kv =>
+      if (regex.findPrefixOf(kv._1) != None) {
+        val regex(prefix, suffix) = kv._1
+        subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
+      }
+    }
+    subProperties
+  }
+
+  def getInstance(inst: String): Properties = {
+    propertyCategories.get(inst) match {
+      case Some(s) => s
+      case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
+    }
+  }
+}
+
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
new file mode 100644
index 0000000000000000000000000000000000000000..2f87577ff335f56f4c12f508cc4701bb1db60fa6
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -0,0 +1,129 @@
+package spark.metrics
+
+import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import spark.Logging
+import spark.metrics.sink.Sink
+import spark.metrics.source.Source
+
+/**
+ * Spark Metrics System, created by specific "instance", combined by source,
+ * sink, periodically poll source metrics data to sink destinations.
+ *
+ * "instance" specify "who" (the role) use metrics system. In spark there are several roles
+ * like master, worker, executor, client driver, these roles will create metrics system
+ * for monitoring. So instance represents these roles. Currently in Spark, several instances
+ * have already implemented: master, worker, executor, driver.
+ *
+ * "source" specify "where" (source) to collect metrics data. In metrics system, there exists
+ * two kinds of source:
+ *   1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
+ *   Spark component's internal state, these sources are related to instance and will be
+ *   added after specific metrics system is created.
+ *   2. Common source, like JvmSource, which will collect low level state, is configured by
+ *   configuration and loaded through reflection.
+ *
+ * "sink" specify "where" (destination) to output metrics data to. Several sinks can be
+ * coexisted and flush metrics to all these sinks.
+ *
+ * Metrics configuration format is like below:
+ * [instance].[sink|source].[name].[options] = xxxx
+ *
+ * [instance] can be "master", "worker", "executor", "driver", which means only the specified
+ * instance has this property.
+ * wild card "*" can be used to replace instance name, which means all the instances will have
+ * this property.
+ *
+ * [sink|source] means this property belongs to source or sink. This field can only be source or sink.
+ *
+ * [name] specify the name of sink or source, it is custom defined.
+ *
+ * [options] is the specific property of this source or sink.
+ */
+private[spark] class MetricsSystem private (val instance: String) extends Logging {
+  initLogging()
+
+  val confFile = System.getProperty("spark.metrics.conf")
+  val metricsConfig = new MetricsConfig(Option(confFile))
+
+  val sinks = new mutable.ArrayBuffer[Sink]
+  val sources = new mutable.ArrayBuffer[Source]
+  val registry = new MetricRegistry()
+
+  metricsConfig.initialize()
+  registerSources()
+  registerSinks()
+
+  def start() {
+    sinks.foreach(_.start)
+  }
+
+  def stop() {
+    sinks.foreach(_.stop)
+  }
+
+  def registerSource(source: Source) {
+    sources += source
+    try {
+      registry.register(source.sourceName, source.metricRegistry)
+    } catch {
+      case e: IllegalArgumentException => logInfo("Metrics already registered", e)
+    }
+  }
+
+  def registerSources() {
+    val instConfig = metricsConfig.getInstance(instance)
+    val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
+
+    // Register all the sources related to instance
+    sourceConfigs.foreach { kv =>
+      val classPath = kv._2.getProperty("class")
+      try {
+        val source = Class.forName(classPath).newInstance()
+        registerSource(source.asInstanceOf[Source])
+      } catch {
+        case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
+      }
+    }
+  }
+
+  def registerSinks() {
+    val instConfig = metricsConfig.getInstance(instance)
+    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
+
+    sinkConfigs.foreach { kv =>
+      val classPath = kv._2.getProperty("class")
+      try {
+        val sink = Class.forName(classPath)
+          .getConstructor(classOf[Properties], classOf[MetricRegistry])
+          .newInstance(kv._2, registry)
+        sinks += sink.asInstanceOf[Sink]
+      } catch {
+        case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
+      }
+    }
+  }
+}
+
+private[spark] object MetricsSystem {
+  val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
+  val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
+
+  val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
+  val MINIMAL_POLL_PERIOD = 1
+
+  def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
+    val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
+    if (period < MINIMAL_POLL_PERIOD) {
+      throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
+        " below than minimal polling period ")
+    }
+  }
+
+  def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+}
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
new file mode 100644
index 0000000000000000000000000000000000000000..eaaac5d153a2e305377481b3fb94352821968ac8
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -0,0 +1,42 @@
+package spark.metrics.sink
+
+import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import spark.metrics.MetricsSystem
+
+class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+  val CONSOLE_DEFAULT_PERIOD = 10
+  val CONSOLE_DEFAULT_UNIT = "SECONDS"
+
+  val CONSOLE_KEY_PERIOD = "period"
+  val CONSOLE_KEY_UNIT = "unit"
+
+  val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
+    case Some(s) => s.toInt
+    case None => CONSOLE_DEFAULT_PERIOD
+  }
+
+  val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
+    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+    case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
+  }
+
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+  val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
+      .convertDurationsTo(TimeUnit.MILLISECONDS)
+      .convertRatesTo(TimeUnit.SECONDS)
+      .build()
+
+  override def start() {
+    reporter.start(pollPeriod, pollUnit)
+  }
+
+  override def stop() {
+    reporter.stop()
+  }
+}
+
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
new file mode 100644
index 0000000000000000000000000000000000000000..aa5bff0d34d0da22dbaed3ad7832c3fe5e990adb
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -0,0 +1,51 @@
+package spark.metrics.sink
+
+import com.codahale.metrics.{CsvReporter, MetricRegistry}
+
+import java.io.File
+import java.util.{Locale, Properties}
+import java.util.concurrent.TimeUnit
+
+import spark.metrics.MetricsSystem
+
+class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+  val CSV_KEY_PERIOD = "period"
+  val CSV_KEY_UNIT = "unit"
+  val CSV_KEY_DIR = "directory"
+
+  val CSV_DEFAULT_PERIOD = 10
+  val CSV_DEFAULT_UNIT = "SECONDS"
+  val CSV_DEFAULT_DIR = "/tmp/"
+
+  val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
+    case Some(s) => s.toInt
+    case None => CSV_DEFAULT_PERIOD
+  }
+
+  val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
+    case Some(s) => TimeUnit.valueOf(s.toUpperCase())
+    case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
+  }
+  
+  MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+  val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
+    case Some(s) => s
+    case None => CSV_DEFAULT_DIR
+  }
+
+  val reporter: CsvReporter = CsvReporter.forRegistry(registry)
+      .formatFor(Locale.US)
+      .convertDurationsTo(TimeUnit.MILLISECONDS)
+      .convertRatesTo(TimeUnit.SECONDS)
+      .build(new File(pollDir))
+
+  override def start() {
+    reporter.start(pollPeriod, pollUnit)
+  }
+
+  override def stop() {
+    reporter.stop()
+  }
+}
+
diff --git a/core/src/main/scala/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
new file mode 100644
index 0000000000000000000000000000000000000000..6a40885b7818cfd5335755f34fe714fcf286379c
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
@@ -0,0 +1,18 @@
+package spark.metrics.sink
+
+import com.codahale.metrics.{JmxReporter, MetricRegistry}
+
+import java.util.Properties
+
+class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink {
+  val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
+
+  override def start() {
+    reporter.start()
+  }
+
+  override def stop() {
+    reporter.stop()
+  }
+
+}
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
new file mode 100644
index 0000000000000000000000000000000000000000..3ffdcbdaba41459f7b46ac8c49e48646a6130cfd
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -0,0 +1,6 @@
+package spark.metrics.sink
+
+trait Sink {
+  def start: Unit
+  def stop: Unit
+}
\ No newline at end of file
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
new file mode 100644
index 0000000000000000000000000000000000000000..79f505079c8bab7a7d8437ee2da47348530db2c9
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala
@@ -0,0 +1,15 @@
+package spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
+
+class JvmSource extends Source {
+  val sourceName = "jvm"
+  val metricRegistry = new MetricRegistry()
+
+  val gcMetricSet = new GarbageCollectorMetricSet
+  val memGaugeSet = new MemoryUsageGaugeSet
+
+  metricRegistry.registerAll(gcMetricSet)
+  metricRegistry.registerAll(memGaugeSet)
+}
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
new file mode 100644
index 0000000000000000000000000000000000000000..5607e2c40a719f47d81098480308815be9da6245
--- /dev/null
+++ b/core/src/main/scala/spark/metrics/source/Source.scala
@@ -0,0 +1,8 @@
+package spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+
+trait Source {
+  def sourceName: String
+  def metricRegistry: MetricRegistry
+}
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
new file mode 100644
index 0000000000000000000000000000000000000000..87d27cc70d77fe4abd123ce2663e8897f4e1ab4a
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
@@ -0,0 +1,30 @@
+package spark.scheduler
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import spark.metrics.source.Source
+
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+  val metricRegistry = new MetricRegistry()
+  val sourceName = "DAGScheduler"
+
+  metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.failed.size
+  })
+
+  metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.running.size
+  })
+
+  metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.waiting.size
+  })
+
+  metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.nextRunId.get()
+  })
+
+  metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.activeJobs.size
+  })
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
new file mode 100644
index 0000000000000000000000000000000000000000..4faa715c9475fce4035e02804ba4a075a5cf13aa
--- /dev/null
+++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala
@@ -0,0 +1,48 @@
+package spark.storage
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import spark.metrics.source.Source
+import spark.storage._
+
+private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
+  val metricRegistry = new MetricRegistry()
+  val sourceName = "BlockManager"
+
+  metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
+    override def getValue: Long = {
+      val storageStatusList = blockManager.master.getStorageStatus
+      val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+      maxMem / 1024 / 1024
+    }
+  })
+
+  metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
+    override def getValue: Long = {
+      val storageStatusList = blockManager.master.getStorageStatus
+      val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+      remainingMem / 1024 / 1024
+    }
+  })
+
+  metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
+    override def getValue: Long = {
+      val storageStatusList = blockManager.master.getStorageStatus
+      val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
+      val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
+      (maxMem - remainingMem) / 1024 / 1024
+    }
+  })
+
+  metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
+    override def getValue: Long = {
+      val storageStatusList = blockManager.master.getStorageStatus
+      val diskSpaceUsed = storageStatusList
+      	.flatMap(_.blocks.values.map(_.diskSize))
+      	.reduceOption(_ + _)
+      	.getOrElse(0L)
+
+      diskSpaceUsed / 1024 / 1024
+    }
+  })
+}
diff --git a/core/src/test/resources/test_metrics_config.properties b/core/src/test/resources/test_metrics_config.properties
new file mode 100644
index 0000000000000000000000000000000000000000..2b31ddf2ebca651d7e2273c013e1e2dd9345b056
--- /dev/null
+++ b/core/src/test/resources/test_metrics_config.properties
@@ -0,0 +1,6 @@
+*.sink.console.period = 10
+*.sink.console.unit = seconds
+*.source.jvm.class = spark.metrics.source.JvmSource
+master.sink.console.period = 20
+master.sink.console.unit = minutes
+
diff --git a/core/src/test/resources/test_metrics_system.properties b/core/src/test/resources/test_metrics_system.properties
new file mode 100644
index 0000000000000000000000000000000000000000..d5479f02980af9c96bc3202e109277a435ea69d4
--- /dev/null
+++ b/core/src/test/resources/test_metrics_system.properties
@@ -0,0 +1,7 @@
+*.sink.console.period = 10
+*.sink.console.unit = seconds
+test.sink.console.class = spark.metrics.sink.ConsoleSink
+test.sink.dummy.class = spark.metrics.sink.DummySink
+test.source.dummy.class = spark.metrics.source.DummySource
+test.sink.console.period = 20
+test.sink.console.unit = minutes
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..87cd2ffad25ba11b8a2c377276c5a516d70a3c5d
--- /dev/null
+++ b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
@@ -0,0 +1,64 @@
+package spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileOutputStream}
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import spark.metrics._
+
+class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
+  var filePath: String = _
+
+  before {
+    filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile()
+  }
+
+  test("MetricsConfig with default properties") {
+    val conf = new MetricsConfig(Option("dummy-file"))
+    conf.initialize()
+
+    assert(conf.properties.size() === 0)
+    assert(conf.properties.getProperty("test-for-dummy") === null)
+
+    val property = conf.getInstance("random")
+    assert(property.size() === 0)
+  }
+
+  test("MetricsConfig with properties set") {
+    val conf = new MetricsConfig(Option(filePath))
+    conf.initialize()
+
+    val masterProp = conf.getInstance("master")
+    assert(masterProp.size() === 3)
+    assert(masterProp.getProperty("sink.console.period") === "20")
+    assert(masterProp.getProperty("sink.console.unit") === "minutes")
+    assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+
+    val workerProp = conf.getInstance("worker")
+    assert(workerProp.size() === 3)
+    assert(workerProp.getProperty("sink.console.period") === "10")
+    assert(workerProp.getProperty("sink.console.unit") === "seconds")
+    assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
+  }
+
+  test("MetricsConfig with subProperties") {
+    val conf = new MetricsConfig(Option(filePath))
+    conf.initialize()
+
+    val propCategories = conf.propertyCategories
+    assert(propCategories.size === 2)
+
+    val masterProp = conf.getInstance("master")
+    val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
+    assert(sourceProps.size === 1)
+    assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
+
+    val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
+    assert(sinkProps.size === 1)
+    assert(sinkProps.contains("console"))
+
+    val consoleProps = sinkProps("console")
+    assert(consoleProps.size() === 2)
+  }
+}
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
new file mode 100644
index 0000000000000000000000000000000000000000..c1899964172fa235d4525e7958e3110794e20bb6
--- /dev/null
+++ b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
@@ -0,0 +1,39 @@
+package spark.metrics
+
+import java.util.Properties
+import java.io.{File, FileOutputStream}
+
+import org.scalatest.{BeforeAndAfter, FunSuite}
+
+import spark.metrics._
+
+class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
+  var filePath: String = _
+
+  before {
+    filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
+    System.setProperty("spark.metrics.conf", filePath)
+  }
+
+  test("MetricsSystem with default config") {
+    val metricsSystem = MetricsSystem.createMetricsSystem("default")
+    val sources = metricsSystem.sources
+    val sinks = metricsSystem.sinks
+
+    assert(sources.length === 0)
+    assert(sinks.length === 0)
+  }
+
+  test("MetricsSystem with sources add") {
+    val metricsSystem = MetricsSystem.createMetricsSystem("test")
+    val sources = metricsSystem.sources
+    val sinks = metricsSystem.sinks
+
+    assert(sources.length === 0)
+    assert(sinks.length === 1)
+
+    val source = new spark.deploy.master.MasterSource(null)
+    metricsSystem.registerSource(source)
+    assert(sources.length === 1)
+  }
+}
diff --git a/pom.xml b/pom.xml
index eb7bd7e9df9835013e768d792d37db5c59be655f..3fbd93c7dd5ef03d8d6a7167e0975c9cc725d2d3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -268,6 +268,14 @@
         <groupId>org.scala-lang</groupId>
         <artifactId>scalap</artifactId>
         <version>${scala.version}</version>
+        <groupId>com.codahale.metrics</groupId>
+        <artifactId>metrics-core</artifactId>
+        <version>3.0.0</version>
+      </dependency>
+      <dependency>
+        <groupId>com.codahale.metrics</groupId>
+        <artifactId>metrics-jvm</artifactId>
+        <version>3.0.0</version>
       </dependency>
 
       <dependency>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index f3f67b57c89d7ab6fa5962361e85e5cb9c6c9235..d4d70afdd56c1cf71ebbb1580e2c97086af096e4 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -179,7 +179,9 @@ object SparkBuild extends Build {
       "net.liftweb" % "lift-json_2.9.2" % "2.5",
       "org.apache.mesos" % "mesos" % "0.9.0-incubating",
       "io.netty" % "netty-all" % "4.0.0.Beta2",
-      "org.apache.derby" % "derby" % "10.4.2.0" % "test"
+      "org.apache.derby" % "derby" % "10.4.2.0" % "test",
+      "com.codahale.metrics" % "metrics-core" % "3.0.0",
+      "com.codahale.metrics" % "metrics-jvm" % "3.0.0"
     ) ++ (
       if (HADOOP_MAJOR_VERSION == "2") {
         if (HADOOP_YARN) {