diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index 0486ca4c79213edb35f8f56a3123deecd289e029..63a5a2093ebaa746a7aecf4c63b03c74ff6d7faa 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -1,48 +1,45 @@ -# syntax: [instance].[sink|source].[name].[options] - -# "instance" specify "who" (the role) use metrics system. In spark there are -# several roles like master, worker, executor, driver, these roles will -# create metrics system for monitoring. So instance represents these roles. -# Currently in Spark, several instances have already implemented: master, -# worker, executor, driver. -# -# [instance] field can be "master", "worker", "executor", "driver", which means -# only the specified instance has this property. -# a wild card "*" can be used to represent instance name, which means all the -# instances will have this property. +# syntax: [instance].sink|source.[name].[options]=[value] + +# This file configures Spark's internal metrics system. The metrics system is +# divided into instances which correspond to internal components. +# Each instance can be configured to report its metrics to one or more sinks. +# Accepted values for [instance] are "master", "worker", "executor", "driver", +# and "applications". A wild card "*" can be used as an instance name, in +# which case all instances will inherit the supplied property. # -# "source" specify "where" (source) to collect metrics data. In metrics system, -# there exists two kinds of source: -# 1. Spark internal source, like MasterSource, WorkerSource, etc, which will -# collect Spark component's internal state, these sources are related to -# instance and will be added after specific metrics system is created. -# 2. Common source, like JvmSource, which will collect low level state, is -# configured by configuration and loaded through reflection. +# Within an instance, a "source" specifies a particular set of grouped metrics. +# there are two kinds of sources: +# 1. Spark internal sources, like MasterSource, WorkerSource, etc, which will +# collect a Spark component's internal state. Each instance is paired with a +# Spark source that is added automatically. +# 2. Common sources, like JvmSource, which will collect low level state. +# These can be added through configuration options and are then loaded +# using reflection. # -# "sink" specify "where" (destination) to output metrics data to. Several sinks -# can be coexisted and flush metrics to all these sinks. +# A "sink" specifies where metrics are delivered to. Each instance can be +# assigned one or more sinks. # -# [sink|source] field specify this property is source related or sink, this -# field can only be source or sink. +# The sink|source field specifies whether the property relates to a sink or +# source. # -# [name] field specify the name of source or sink, this is custom defined. +# The [name] field specifies the name of source or sink. # -# [options] field is the specific property of this source or sink, this source -# or sink is responsible for parsing this property. +# The [options] field is the specific property of this source or sink. The +# source or sink is responsible for parsing this property. # # Notes: -# 1. Sinks should be added through configuration, like console sink, class -# full name should be specified by class property. -# 2. Some sinks can specify polling period, like console sink, which is 10 seconds, -# it should be attention minimal polling period is 1 seconds, any period -# below than 1s is illegal. -# 3. Wild card property can be overlapped by specific instance property, for -# example, *.sink.console.period can be overlapped by master.sink.console.period. +# 1. To add a new sink, set the "class" option to a fully qualified class +# name (see examples below). +# 2. Some sinks involve a polling period. The minimum allowed polling period +# is 1 second. +# 3. Wild card properties can be overridden by more specific properties. +# For example, master.sink.console.period takes precedence over +# *.sink.console.period. # 4. A metrics specific configuration # "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be -# added to Java property using -Dspark.metrics.conf=xxx if you want to -# customize metrics system, or you can put it in ${SPARK_HOME}/conf, -# metrics system will search and load it automatically. +# added to Java properties using -Dspark.metrics.conf=xxx if you want to +# customize metrics system. You can also put the file in ${SPARK_HOME}/conf +# and it will be loaded automatically. # Enable JmxSink for all instances by class name #*.sink.jmx.class=spark.metrics.sink.JmxSink diff --git a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala index 15ff9197382c8c06f5ae2173dbab0f0c37c6665d..79687df614e979430cd0c0aee9fd0540964ab702 100644 --- a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala @@ -34,6 +34,7 @@ private[spark] class ApplicationInfo( var executors = new mutable.HashMap[Int, ExecutorInfo] var coresGranted = 0 var endTime = -1L + val appSource = new ApplicationSource(this) private var nextExecutorId = 0 diff --git a/core/src/main/scala/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala new file mode 100644 index 0000000000000000000000000000000000000000..4df2b6bfddd7503ef4b5b59b8e9b3c660ddfd3ec --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala @@ -0,0 +1,24 @@ +package spark.deploy.master + +import com.codahale.metrics.{Gauge, MetricRegistry} + +import spark.metrics.source.Source + +class ApplicationSource(val application: ApplicationInfo) extends Source { + val metricRegistry = new MetricRegistry() + val sourceName = "%s.%s.%s".format("application", application.desc.name, + System.currentTimeMillis()) + + metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] { + override def getValue: String = application.state.toString + }) + + metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] { + override def getValue: Long = application.duration + }) + + metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] { + override def getValue: Int = application.coresGranted + }) + +} diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 202d5bcdb7f013cef97b78ced528401d371f40e5..0aed4b9802ef48c045a5b54d41c4e2193508cdb2 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -38,6 +38,7 @@ import spark.util.AkkaUtils private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 + val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] @@ -59,7 +60,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act Utils.checkHost(host, "Expected hostname") - val metricsSystem = MetricsSystem.createMetricsSystem("master") + val masterMetricsSystem = MetricsSystem.createMetricsSystem("master") + val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications") val masterSource = new MasterSource(this) val masterPublicAddress = { @@ -79,13 +81,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act webUi.start() context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) - metricsSystem.registerSource(masterSource) - metricsSystem.start() + masterMetricsSystem.registerSource(masterSource) + masterMetricsSystem.start() + applicationMetricsSystem.start() } override def postStop() { webUi.stop() - metricsSystem.stop() + masterMetricsSystem.stop() + applicationMetricsSystem.stop() } override def receive = { @@ -275,6 +279,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val now = System.currentTimeMillis() val date = new Date(now) val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl) + applicationMetricsSystem.registerSource(app.appSource) apps += app idToApp(app.id) = app actorToApp(driver) = app @@ -300,7 +305,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act idToApp -= app.id actorToApp -= app.driver addressToApp -= app.driver.path.address - completedApps += app // Remember it in our history + if (completedApps.size >= RETAINED_APPLICATIONS) { + val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) + completedApps.take(toRemove).foreach( a => { + applicationMetricsSystem.removeSource(a.appSource) + }) + completedApps.trimStart(toRemove) + } + completedApps += app // Remember it in our history waitingApps -= app for (exec <- app.executors.values) { exec.worker.removeExecutor(exec) diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala index fabddfb9476a54ed04f0caa22321e05ee630ffc1..1dacafa13517c66db4ccf20b28177672d9b2dc9c 100644 --- a/core/src/main/scala/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala @@ -17,7 +17,7 @@ package spark.metrics -import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry} +import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry} import java.util.Properties import java.util.concurrent.TimeUnit @@ -93,6 +93,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin } } + def removeSource(source: Source) { + sources -= source + registry.removeMatching(new MetricFilter { + def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName) + }) + } + def registerSources() { val instConfig = metricsConfig.getInstance(instance) val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)