diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 3a7c4e5a521b58e6c4bf0a8a7e579f737518454c..e44f5e316802ad9aed2e2a654ba0d18417a1597c 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -29,12 +29,12 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import spark.deploy._ import spark.{Logging, SparkException, Utils} +import spark.metrics.MetricsSystem import spark.util.AkkaUtils import ui.MasterWebUI -private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor -with Logging with MasterInstrumentation { +private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 @@ -57,6 +57,8 @@ with Logging with MasterInstrumentation { val webUi = new MasterWebUI(self, webUiPort) Utils.checkHost(host, "Expected hostname") + + val masterInstrumentation = new MasterInstrumentation(this) val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") @@ -75,7 +77,7 @@ with Logging with MasterInstrumentation { webUi.start() context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) - initialize(this) + Master.metricsSystem.registerSource(masterInstrumentation) } override def postStop() { @@ -319,21 +321,22 @@ with Logging with MasterInstrumentation { removeWorker(worker) } } - - override def postStop() { - uninitialize() - } } private[spark] object Master { private val systemName = "sparkMaster" private val actorName = "Master" private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r + + private val metricsSystem = MetricsSystem.createMetricsSystem("master") def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort) + + metricsSystem.start() actorSystem.awaitTermination() + metricsSystem.stop() } /** Returns an `akka://...` URL for the Master actor given a sparkUrl `spark://host:ip`. */ diff --git a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala index c295e725d7151db9d9c3eac3789b4dd5f3d6a55c..5ea9a90319fc0d9d08496c063d5504e4b9cb0131 100644 --- a/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala +++ b/core/src/main/scala/spark/deploy/master/MasterInstrumentation.scala @@ -1,47 +1,35 @@ package spark.deploy.master -import com.codahale.metrics.{Gauge, JmxReporter, MetricRegistry} +import java.util.{Map, HashMap => JHashMap} -import spark.metrics.AbstractInstrumentation +import com.codahale.metrics.{Gauge, Metric} -private[spark] trait MasterInstrumentation extends AbstractInstrumentation { - var masterInst: Option[Master] = None - val metricRegistry = new MetricRegistry() - - override def registryHandler = metricRegistry - - override def instance = "master" +import spark.metrics.source.Source + +private[spark] class MasterInstrumentation(val master: Master) extends Source { + val className = classOf[Master].getName() + val instrumentationName = "master" + + override def sourceName = instrumentationName - def initialize(master: Master) { - masterInst = Some(master) + override def getMetrics(): Map[String, Metric] = { + val gauges = new JHashMap[String, Metric] - // Register all the sources - registerSources() + // Gauge for worker numbers in cluster + gauges.put(className + ".workers.number", new Gauge[Int] { + override def getValue: Int = master.workers.size + }) - // Register and start all the sinks - registerSinks() - } - - def uninitialize() { - unregisterSinks() + // Gauge for application numbers in cluster + gauges.put(className + ".apps.number", new Gauge[Int] { + override def getValue: Int = master.apps.size + }) + + // Gauge for waiting application numbers in cluster + gauges.put(className + ".waiting_apps.number", new Gauge[Int] { + override def getValue: Int = master.waitingApps.size + }) + + gauges } - - // Gauge for worker numbers in cluster - metricRegistry.register(MetricRegistry.name(classOf[Master], "workers", "number"), - new Gauge[Int] { - override def getValue: Int = masterInst.map(_.workers.size).getOrElse(0) - }) - - // Gauge for application numbers in cluster - metricRegistry.register(MetricRegistry.name(classOf[Master], "apps", "number"), - new Gauge[Int] { - override def getValue: Int = masterInst.map(_.apps.size).getOrElse(0) - }) - - // Gauge for waiting application numbers in cluster - metricRegistry.register(MetricRegistry.name(classOf[Master], "waiting_apps", "number"), - new Gauge[Int] { - override def getValue: Int = masterInst.map(_.waitingApps.size).getOrElse(0) - }) - } \ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index b64bdb8d28c0d5791859ccd6915707ddf7454517..eaa1c1806f4b281e482284569911e1fa70049e69 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -23,6 +23,7 @@ import akka.util.duration._ import spark.{Logging, Utils} import spark.util.AkkaUtils import spark.deploy._ +import spark.metrics.MetricsSystem import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import java.text.SimpleDateFormat import java.util.Date @@ -41,7 +42,7 @@ private[spark] class Worker( memory: Int, masterUrl: String, workDirPath: String = null) - extends Actor with Logging with WorkerInstrumentation { + extends Actor with Logging { Utils.checkHost(host, "Expected hostname") assert (port > 0) @@ -67,6 +68,8 @@ private[spark] class Worker( var coresUsed = 0 var memoryUsed = 0 + val workerInstrumentation = new WorkerInstrumentation(this) + def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed @@ -99,7 +102,8 @@ private[spark] class Worker( connectToMaster() startWebUi() - initialize(this) + Worker.metricsSystem.registerSource(workerInstrumentation) + Worker.metricsSystem.start() } def connectToMaster() { @@ -182,11 +186,13 @@ private[spark] class Worker( executors.values.foreach(_.kill()) webUi.stop() - uninitialize() + Worker.metricsSystem.stop() } } private[spark] object Worker { + private val metricsSystem = MetricsSystem.createMetricsSystem("worker") + def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, diff --git a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala index 2f725300b5cbd1eb7f55c2cc05e10dae4b604612..37fd154859915f868be91db6eedb212ffab7eadd 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerInstrumentation.scala @@ -1,58 +1,89 @@ package spark.deploy.worker -import com.codahale.metrics.{JmxReporter, Gauge, MetricRegistry} +import com.codahale.metrics.{Gauge, Metric} -import spark.metrics.AbstractInstrumentation +import java.util.{Map, HashMap => JHashMap} -private[spark] trait WorkerInstrumentation extends AbstractInstrumentation { - var workerInst: Option[Worker] = None - val metricRegistry = new MetricRegistry() - - override def registryHandler = metricRegistry - - override def instance = "worker" +import spark.metrics.source.Source + +private[spark] class WorkerInstrumentation(val worker: Worker) extends Source { + val className = classOf[Worker].getName() - def initialize(worker: Worker) { - workerInst = Some(worker) + override def sourceName = "worker" - // Register all the sources - registerSources() + override def getMetrics: Map[String, Metric] = { + val gauges = new JHashMap[String, Metric] - // Register and start all the sinks - registerSinks() - } - - def uninitialize() { - unregisterSinks() + // Gauge for executors number + gauges.put(className + ".executor.number", new Gauge[Int]{ + override def getValue: Int = worker.executors.size + }) + + gauges.put(className + ".core_used.number", new Gauge[Int]{ + override def getValue: Int = worker.coresUsed + }) + + gauges.put(className + ".mem_used.MBytes", new Gauge[Int]{ + override def getValue: Int = worker.memoryUsed + }) + + gauges.put(className + ".core_free.number", new Gauge[Int]{ + override def getValue: Int = worker.coresFree + }) + + gauges.put(className + ".mem_free.MBytes", new Gauge[Int]{ + override def getValue: Int = worker.memoryFree + }) + + gauges } - - // Gauge for executors number - metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"), - new Gauge[Int] { - override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0) - }) - - // Gauge for cores used of this worker - metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"), - new Gauge[Int] { - override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0) - }) - - // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"), - new Gauge[Int] { - override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0) - }) - - // Gauge for cores free of this worker - metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"), - new Gauge[Int] { - override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0) - }) - - // Gauge for memory used of this worker - metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"), - new Gauge[Int] { - override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0) - }) -} \ No newline at end of file +} +//private[spark] trait WorkerInstrumentation extends AbstractInstrumentation { +// var workerInst: Option[Worker] = None +// val metricRegistry = new MetricRegistry() +// +// override def registryHandler = metricRegistry +// +// override def instance = "worker" +// +// def initialize(worker: Worker) { +// workerInst = Some(worker) +// +// registerSources() +// registerSinks() +// } +// +// def uninitialize() { +// unregisterSinks() +// } +// +// // Gauge for executors number +// metricRegistry.register(MetricRegistry.name(classOf[Worker], "executor", "number"), +// new Gauge[Int] { +// override def getValue: Int = workerInst.map(_.executors.size).getOrElse(0) +// }) +// +// // Gauge for cores used of this worker +// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_used", "number"), +// new Gauge[Int] { +// override def getValue: Int = workerInst.map(_.coresUsed).getOrElse(0) +// }) +// +// // Gauge for memory used of this worker +// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_used", "MBytes"), +// new Gauge[Int] { +// override def getValue: Int = workerInst.map(_.memoryUsed).getOrElse(0) +// }) +// +// // Gauge for cores free of this worker +// metricRegistry.register(MetricRegistry.name(classOf[Worker], "core_free", "number"), +// new Gauge[Int] { +// override def getValue: Int = workerInst.map(_.coresFree).getOrElse(0) +// }) +// +// // Gauge for memory used of this worker +// metricRegistry.register(MetricRegistry.name(classOf[Worker], "mem_free", "MBytes"), +// new Gauge[Int] { +// override def getValue: Int = workerInst.map(_.memoryFree).getOrElse(0) +// }) +//} \ No newline at end of file diff --git a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala similarity index 64% rename from core/src/main/scala/spark/metrics/AbstractInstrumentation.scala rename to core/src/main/scala/spark/metrics/MetricsSystem.scala index 9cae1e0220a1bfb188e706c3db2fb060d3a08579..ea1bc490b54f7043ffad299981e5975e4c4ebb35 100644 --- a/core/src/main/scala/spark/metrics/AbstractInstrumentation.scala +++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala @@ -2,7 +2,7 @@ package spark.metrics import scala.collection.mutable -import com.codahale.metrics.{JmxReporter, MetricRegistry} +import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry} import java.util.Properties import java.util.concurrent.TimeUnit @@ -11,70 +11,76 @@ import spark.Logging import spark.metrics.sink._ import spark.metrics.source._ -private [spark] trait AbstractInstrumentation extends Logging { +private[spark] class MetricsSystem private (val instance: String) extends Logging { initLogging() - // Get MetricRegistry handler - def registryHandler: MetricRegistry - // Get the instance name - def instance: String - val confFile = System.getProperty("spark.metrics.conf.file", MetricsConfig.DEFAULT_CONFIG_FILE) val metricsConfig = new MetricsConfig(confFile) val sinks = new mutable.ArrayBuffer[Sink] val sources = new mutable.ArrayBuffer[Source] + def start() { + registerSources() + registerSinks() + } + + def stop() { + sinks.foreach(_.stop) + } + + def registerSource(source: Source) { + sources += source + MetricsSystem.registry.registerAll(source.asInstanceOf[MetricSet]) + } + def registerSources() { val instConfig = metricsConfig.getInstance(instance) - val sourceConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SOURCE_REGEX) + val sourceConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX) - // Register all the sources + // Register all the sources related to instance sourceConfigs.foreach { kv => val classPath = kv._2.getProperty("class") try { - val source = Class.forName(classPath).getConstructor(classOf[MetricRegistry]) - .newInstance(registryHandler) + val source = Class.forName(classPath).newInstance() sources += source.asInstanceOf[Source] + MetricsSystem.registry.registerAll(source.asInstanceOf[MetricSet]) } catch { case e: Exception => logError("source class " + classPath + " cannot be instantialized", e) } } - sources.foreach(_.registerSource) } def registerSinks() { val instConfig = metricsConfig.getInstance(instance) - val sinkConfigs = MetricsConfig.subProperties(instConfig, AbstractInstrumentation.SINK_REGEX) + val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX) // Register JMX sink as a default sink - sinks += new JmxSink(registryHandler) + sinks += new JmxSink(MetricsSystem.registry) // Register other sinks according to conf sinkConfigs.foreach { kv => - val classPath = if (AbstractInstrumentation.DEFAULT_SINKS.contains(kv._1)) { - AbstractInstrumentation.DEFAULT_SINKS(kv._1) + val classPath = if (MetricsSystem.DEFAULT_SINKS.contains(kv._1)) { + MetricsSystem.DEFAULT_SINKS(kv._1) } else { // For non-default sink, a property class should be set and create using reflection kv._2.getProperty("class") } try { val sink = Class.forName(classPath).getConstructor(classOf[Properties], classOf[MetricRegistry]) - .newInstance(kv._2, registryHandler) + .newInstance(kv._2, MetricsSystem.registry) sinks += sink.asInstanceOf[Sink] } catch { case e: Exception => logError("sink class " + classPath + " cannot be instantialized", e) } } - sinks.foreach(_.registerSink) - } - - def unregisterSinks() { - sinks.foreach(_.unregisterSink) + sinks.foreach(_.start) } } -object AbstractInstrumentation { +private[spark] object MetricsSystem { + val registry = new MetricRegistry() + val DEFAULT_SINKS = Map( "console" -> "spark.metrics.sink.ConsoleSink", "csv" -> "spark.metrics.sink.CsvSink") @@ -88,4 +94,6 @@ object AbstractInstrumentation { "minute" -> TimeUnit.MINUTES, "hour" -> TimeUnit.HOURS, "day" -> TimeUnit.DAYS) + + def createMetricsSystem(instance: String) = new MetricsSystem(instance) } \ No newline at end of file diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala index 5426af8c4c271e103064aea91bf1979871c7f239..b49a211fb3f22f852b5168ab814605142b797983 100644 --- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala +++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala @@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{ConsoleReporter, MetricRegistry} -import spark.metrics.AbstractInstrumentation +import spark.metrics.MetricsSystem class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink { val pollPeriod = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_PERIOD)) match { @@ -14,13 +14,13 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend } val pollUnit = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_UNIT)) match { - case Some(s) => AbstractInstrumentation.timeUnits(s) - case None => AbstractInstrumentation.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT) + case Some(s) => MetricsSystem.timeUnits(s) + case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT) } var reporter: ConsoleReporter = _ - override def registerSink() { + override def start() { reporter = ConsoleReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) @@ -29,7 +29,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry) extend reporter.start(pollPeriod, pollUnit) } - override def unregisterSink() { + override def stop() { reporter.stop() } } diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala index 1d663f6cffd0165c10a58fe04324e4a199211068..3f572c8e051822fb812ba579ee6607fd0980c8a1 100644 --- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala +++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala @@ -6,7 +6,7 @@ import java.util.concurrent.TimeUnit import com.codahale.metrics.{CsvReporter, MetricRegistry} -import spark.metrics.AbstractInstrumentation +import spark.metrics.MetricsSystem class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink { val pollPeriod = Option(property.getProperty(CsvSink.CSV_KEY_PERIOD)) match { @@ -15,8 +15,8 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si } val pollUnit = Option(property.getProperty(CsvSink.CSV_KEY_UNIT)) match { - case Some(s) => AbstractInstrumentation.timeUnits(s) - case None => AbstractInstrumentation.timeUnits(CsvSink.CSV_DEFAULT_UNIT) + case Some(s) => MetricsSystem.timeUnits(s) + case None => MetricsSystem.timeUnits(CsvSink.CSV_DEFAULT_UNIT) } val pollDir = Option(property.getProperty(CsvSink.CSV_KEY_DIR)) match { @@ -26,7 +26,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si var reporter: CsvReporter = _ - override def registerSink() { + override def start() { reporter = CsvReporter.forRegistry(registry) .formatFor(Locale.US) .convertDurationsTo(TimeUnit.MILLISECONDS) @@ -36,7 +36,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si reporter.start(pollPeriod, pollUnit) } - override def unregisterSink() { + override def stop() { reporter.stop() } } diff --git a/core/src/main/scala/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala index 56e567770014f2106cc2a6f735813b8bba020d90..e223dc26e94ce561139c47bc829831baa5143944 100644 --- a/core/src/main/scala/spark/metrics/sink/JmxSink.scala +++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala @@ -5,12 +5,12 @@ import com.codahale.metrics.{JmxReporter, MetricRegistry} class JmxSink(registry: MetricRegistry) extends Sink { var reporter: JmxReporter = _ - override def registerSink() { + override def start() { reporter = JmxReporter.forRegistry(registry).build() reporter.start() } - override def unregisterSink() { + override def stop() { reporter.stop() } diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala index 26052b7231b40dccac301e34a76c10a34c297f7e..9fef894fde1cf4c59447677c310773ef77bc4e4c 100644 --- a/core/src/main/scala/spark/metrics/sink/Sink.scala +++ b/core/src/main/scala/spark/metrics/sink/Sink.scala @@ -1,7 +1,7 @@ package spark.metrics.sink trait Sink { - def registerSink: Unit + def start: Unit - def unregisterSink: Unit + def stop: Unit } \ No newline at end of file diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala index 8f6bf48843acc0843c81b6058b41c02bdcdead3e..7a7c1b6ffb6575f3b95daffbb11072687e3d8019 100644 --- a/core/src/main/scala/spark/metrics/source/JvmSource.scala +++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala @@ -1,17 +1,23 @@ package spark.metrics.source -import com.codahale.metrics.MetricRegistry -import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet} +import java.util.{Map, HashMap => JHashMap} -class JvmSource(registry: MetricRegistry) extends Source { - // Initialize memory usage gauge for jvm - val memUsageMetricSet = new MemoryUsageGaugeSet - - // Initialize garbage collection usage gauge for jvm - val gcMetricSet = new GarbageCollectorMetricSet - - override def registerSource() { - registry.registerAll(memUsageMetricSet) - registry.registerAll(gcMetricSet) +import com.codahale.metrics.Metric +import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet} + +class JvmSource extends Source { + override def sourceName = "jvm" + + override def getMetrics(): Map[String, Metric] = { + val gauges = new JHashMap[String, Metric] + + import scala.collection.JavaConversions._ + val gcMetricSet = new GarbageCollectorMetricSet + gcMetricSet.getMetrics.foreach(kv => gauges.put(kv._1, kv._2)) + + val memGaugeSet = new MemoryUsageGaugeSet + memGaugeSet.getMetrics.foreach(kv => gauges.put(kv._1, kv._2)) + + gauges } } \ No newline at end of file diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala index 35cfe0c8ff1bfdde4b52203e5b6912346e74f9ca..edd59de46a95d4ab19dd2e38c2cd150cc931a49d 100644 --- a/core/src/main/scala/spark/metrics/source/Source.scala +++ b/core/src/main/scala/spark/metrics/source/Source.scala @@ -1,5 +1,7 @@ package spark.metrics.source -trait Source { - def registerSource: Unit +import com.codahale.metrics.MetricSet + +trait Source extends MetricSet { + def sourceName: String } \ No newline at end of file