Skip to content
Snippets Groups Projects
Commit 37bc64a2 authored by Patrick Wendell's avatar Patrick Wendell
Browse files

Adding application-level metrics.

This adds metrics for applications in the deploy Master.
parent 5e7b38fb
No related branches found
No related tags found
No related merge requests found
......@@ -34,6 +34,7 @@ private[spark] class ApplicationInfo(
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
var endTime = -1L
val appSource = new ApplicationSource(this)
private var nextExecutorId = 0
......
package spark.deploy.master
import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
class ApplicationSource(val application: ApplicationInfo) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "%s.%s.%s".format("application", application.desc.name,
System.currentTimeMillis())
metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
override def getValue: String = application.state.toString
})
metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
override def getValue: Long = application.duration
})
metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
override def getValue: Int = application.coresGranted
})
}
......@@ -38,6 +38,7 @@ import spark.util.AkkaUtils
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retained_applications", "1000").toInt
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
......@@ -59,7 +60,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
val metricsSystem = MetricsSystem.createMetricsSystem("master")
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
val masterSource = new MasterSource(this)
val masterPublicAddress = {
......@@ -79,13 +81,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
metricsSystem.registerSource(masterSource)
metricsSystem.start()
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
}
override def postStop() {
webUi.stop()
metricsSystem.stop()
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
}
override def receive = {
......@@ -275,6 +279,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val now = System.currentTimeMillis()
val date = new Date(now)
val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
actorToApp(driver) = app
......@@ -300,7 +305,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
idToApp -= app.id
actorToApp -= app.driver
addressToApp -= app.driver.path.address
completedApps += app // Remember it in our history
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
......
......@@ -17,7 +17,7 @@
package spark.metrics
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import com.codahale.metrics._
import java.util.Properties
import java.util.concurrent.TimeUnit
......@@ -93,6 +93,14 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
}
def removeSource(source: Source) {
sources -= source
println("Removing source: " + source.sourceName)
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
})
}
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment