Skip to content
Snippets Groups Projects
Commit 4d6dd67f authored by Andrew xia's avatar Andrew xia Committed by jerryshao
Browse files

refactor metrics system

1.change source abstract class to support MetricRegistry
2.change master/work/jvm source class
parent 03f98711
No related branches found
No related tags found
No related merge requests found
...@@ -4,32 +4,32 @@ import java.util.{Map, HashMap => JHashMap} ...@@ -4,32 +4,32 @@ import java.util.{Map, HashMap => JHashMap}
import com.codahale.metrics.{Gauge, Metric} import com.codahale.metrics.{Gauge, Metric}
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import spark.metrics.source.Source import spark.metrics.source.Source
import spark.Logging
private[spark] class MasterInstrumentation(val master: Master) extends Source { private[spark] class MasterInstrumentation(val master: Master) extends Source {
val className = classOf[Master].getName() val className = classOf[Master].getName()
val instrumentationName = "master" val instrumentationName = "master"
val metricRegistry = new MetricRegistry()
override def sourceName = instrumentationName val sourceName = instrumentationName
override def getMetrics(): Map[String, Metric] = { metricRegistry.register(MetricRegistry.name("workers","number"),
val gauges = new JHashMap[String, Metric] new Gauge[Int] {
// Gauge for worker numbers in cluster
gauges.put(className + ".workers.number", new Gauge[Int] {
override def getValue: Int = master.workers.size override def getValue: Int = master.workers.size
}) })
// Gauge for application numbers in cluster // Gauge for application numbers in cluster
gauges.put(className + ".apps.number", new Gauge[Int] { metricRegistry.register(MetricRegistry.name("apps", "number"),
override def getValue: Int = master.apps.size new Gauge[Int] {
}) override def getValue: Int = master.apps.size
})
// Gauge for waiting application numbers in cluster
gauges.put(className + ".waiting_apps.number", new Gauge[Int] { // Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name("waiting_apps", "number"),
new Gauge[Int] {
override def getValue: Int = master.waitingApps.size override def getValue: Int = master.waitingApps.size
}) })
gauges }
}
}
\ No newline at end of file
...@@ -4,86 +4,42 @@ import com.codahale.metrics.{Gauge, Metric} ...@@ -4,86 +4,42 @@ import com.codahale.metrics.{Gauge, Metric}
import java.util.{Map, HashMap => JHashMap} import java.util.{Map, HashMap => JHashMap}
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import spark.metrics.source.Source import spark.metrics.source.Source
private[spark] class WorkerInstrumentation(val worker: Worker) extends Source { private[spark] class WorkerInstrumentation(val worker: Worker) extends Source {
val className = classOf[Worker].getName() val className = classOf[Worker].getName()
override def sourceName = "worker" val sourceName = "worker"
override def getMetrics: Map[String, Metric] = { val metricRegistry = new MetricRegistry()
val gauges = new JHashMap[String, Metric]
metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
// Gauge for executors number new Gauge[Int] {
gauges.put(className + ".executor.number", new Gauge[Int]{
override def getValue: Int = worker.executors.size override def getValue: Int = worker.executors.size
}) })
gauges.put(className + ".core_used.number", new Gauge[Int]{ // Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
new Gauge[Int] {
override def getValue: Int = worker.coresUsed override def getValue: Int = worker.coresUsed
}) })
gauges.put(className + ".mem_used.MBytes", new Gauge[Int]{ // Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
new Gauge[Int] {
override def getValue: Int = worker.memoryUsed override def getValue: Int = worker.memoryUsed
}) })
gauges.put(className + ".core_free.number", new Gauge[Int]{ // Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
new Gauge[Int] {
override def getValue: Int = worker.coresFree override def getValue: Int = worker.coresFree
}) })
gauges.put(className + ".mem_free.MBytes", new Gauge[Int]{ // Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
new Gauge[Int] {
override def getValue: Int = worker.memoryFree override def getValue: Int = worker.memoryFree
}) })
gauges
}
} }
//private[spark] trait WorkerInstrumentation extends AbstractInstrumentation {
// var workerInst: Option[Worker] = None
// val metricRegistry = new MetricRegistry()
//
// override def registryHandler = metricRegistry
//
// override def instance = "worker"
//
// def initialize(worker: Worker) {
// workerInst = Some(worker)
//
// registerSources()
// registerSinks()
// }
//
// def uninitialize() {
// unregisterSinks()
// }
//
// // Gauge for executors number
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0)
// })
//
// // Gauge for cores used of this worker
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0)
// })
//
// // Gauge for memory used of this worker
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0)
// })
//
// // Gauge for cores free of this worker
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0)
// })
//
// // Gauge for memory used of this worker
// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"),
// new Gauge[Int] {
// override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0)
// })
//}
\ No newline at end of file
...@@ -5,6 +5,7 @@ import scala.collection.mutable ...@@ -5,6 +5,7 @@ import scala.collection.mutable
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry} import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import java.util.Properties import java.util.Properties
//import java.util._
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import spark.Logging import spark.Logging
...@@ -19,10 +20,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin ...@@ -19,10 +20,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sinks = new mutable.ArrayBuffer[Sink] val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source] val sources = new mutable.ArrayBuffer[Source]
var registry = new MetricRegistry()
registerSources()
registerSinks()
def start() { def start() {
registerSources() sinks.foreach(_.start)
registerSinks()
} }
def stop() { def stop() {
...@@ -31,20 +35,20 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin ...@@ -31,20 +35,20 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
def registerSource(source: Source) { def registerSource(source: Source) {
sources += source sources += source
MetricsSystem.registry.registerAll(source.asInstanceOf[MetricSet]) registry.register(source.sourceName,source.metricRegistry)
} }
def registerSources() { def registerSources() {
val instConfig = metricsConfig.getInstance(instance) val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX) val sourceConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance // Register all the sources related to instance
sourceConfigs.foreach { kv => sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class") val classPath = kv._2.getProperty("class")
try { try {
val source = Class.forName(classPath).newInstance() val source = Class.forName(classPath).newInstance()
sources += source.asInstanceOf[Source] sources += source.asInstanceOf[Source]
MetricsSystem.registry.registerAll(source.asInstanceOf[MetricSet]) registerSource(source.asInstanceOf[Source])
} catch { } catch {
case e: Exception => logError("source class " + classPath + " cannot be instantialized", e) case e: Exception => logError("source class " + classPath + " cannot be instantialized", e)
} }
...@@ -56,7 +60,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin ...@@ -56,7 +60,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX) val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
// Register JMX sink as a default sink // Register JMX sink as a default sink
sinks += new JmxSink(MetricsSystem.registry) sinks += new JmxSink(registry)
// Register other sinks according to conf // Register other sinks according to conf
sinkConfigs.foreach { kv => sinkConfigs.foreach { kv =>
...@@ -68,19 +72,16 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin ...@@ -68,19 +72,16 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
} }
try { try {
val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry]) val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, MetricsSystem.registry) .newInstance(kv._2, registry)
sinks += sink.asInstanceOf[Sink] sinks += sink.asInstanceOf[Sink]
} catch { } catch {
case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e) case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e)
} }
} }
sinks.foreach(_.start)
} }
} }
private[spark] object MetricsSystem { private[spark] object MetricsSystem {
val registry = new MetricRegistry()
val DEFAULT_SINKS = Map( val DEFAULT_SINKS = Map(
"console" -> "spark.metrics.sink.ConsoleSink", "console" -> "spark.metrics.sink.ConsoleSink",
"csv" -> "spark.metrics.sink.CsvSink") "csv" -> "spark.metrics.sink.CsvSink")
...@@ -96,4 +97,4 @@ private[spark] object MetricsSystem { ...@@ -96,4 +97,4 @@ private[spark] object MetricsSystem {
"day" -> TimeUnit.DAYS) "day" -> TimeUnit.DAYS)
def createMetricsSystem(instance: String) = new MetricsSystem(instance) def createMetricsSystem(instance: String) = new MetricsSystem(instance)
} }
\ No newline at end of file
...@@ -18,14 +18,13 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend ...@@ -18,14 +18,13 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend
case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT) case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
} }
var reporter: ConsoleReporter = _ var reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
override def start() {
reporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS) .convertRatesTo(TimeUnit.SECONDS)
.build() .build()
override def start() {
reporter.start(pollPeriod, pollUnit) reporter.start(pollPeriod, pollUnit)
} }
...@@ -40,4 +39,4 @@ object ConsoleSink { ...@@ -40,4 +39,4 @@ object ConsoleSink {
val CONSOLE_KEY_PERIOD = "period" val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit" val CONSOLE_KEY_UNIT = "unit"
} }
\ No newline at end of file
...@@ -24,15 +24,14 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si ...@@ -24,15 +24,14 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
case None => CsvSink.CSV_DEFAULT_DIR case None => CsvSink.CSV_DEFAULT_DIR
} }
var reporter: CsvReporter = _ var reporter: CsvReporter = CsvReporter.forRegistry(registry)
override def start() {
reporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US) .formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS) .convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir)) .build(new File(pollDir))
override def start() {
reporter.start(pollPeriod, pollUnit) reporter.start(pollPeriod, pollUnit)
} }
......
...@@ -3,10 +3,9 @@ package spark.metrics.sink ...@@ -3,10 +3,9 @@ package spark.metrics.sink
import com.codahale.metrics.{JmxReporter, MetricRegistry} import com.codahale.metrics.{JmxReporter, MetricRegistry}
class JmxSink(registry: MetricRegistry) extends Sink { class JmxSink(registry: MetricRegistry) extends Sink {
var reporter: JmxReporter = _ var reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
override def start() { override def start() {
reporter = JmxReporter.forRegistry(registry).build()
reporter.start() reporter.start()
} }
...@@ -14,4 +13,4 @@ class JmxSink(registry: MetricRegistry) extends Sink { ...@@ -14,4 +13,4 @@ class JmxSink(registry: MetricRegistry) extends Sink {
reporter.stop() reporter.stop()
} }
} }
\ No newline at end of file
...@@ -2,22 +2,16 @@ package spark.metrics.source ...@@ -2,22 +2,16 @@ package spark.metrics.source
import java.util.{Map, HashMap => JHashMap} import java.util.{Map, HashMap => JHashMap}
import com.codahale.metrics.Metric import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet} import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
class JvmSource extends Source { class JvmSource extends Source {
override def sourceName = "jvm" val sourceName = "jvm"
val metricRegistry = new MetricRegistry()
override def getMetrics(): Map[String, Metric] = {
val gauges = new JHashMap[String, Metric]
import scala.collection.JavaConversions._
val gcMetricSet = new GarbageCollectorMetricSet val gcMetricSet = new GarbageCollectorMetricSet
gcMetricSet.getMetrics.foreach(kv => gauges.put(kv._1, kv._2))
val memGaugeSet = new MemoryUsageGaugeSet val memGaugeSet = new MemoryUsageGaugeSet
memGaugeSet.getMetrics.foreach(kv => gauges.put(kv._1, kv._2))
metricRegistry.registerAll(gcMetricSet)
gauges metricRegistry.registerAll(memGaugeSet)
} }
}
\ No newline at end of file
package spark.metrics.source package spark.metrics.source
import com.codahale.metrics.MetricSet import com.codahale.metrics.MetricSet
import com.codahale.metrics.MetricRegistry
trait Source extends MetricSet { trait Source {
def sourceName: String def sourceName: String
} def metricRegistry: MetricRegistry
\ No newline at end of file }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment