Skip to content
Snippets Groups Projects
Commit ed1a3bc2 authored by Andrew xia's avatar Andrew xia Committed by jerryshao
Browse files

continue to refactor code style and functions

parent 5730193e
No related branches found
No related tags found
No related merge requests found
Showing
with 189 additions and 201 deletions
......@@ -274,9 +274,9 @@ class SparkContext(
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
def initDriverMetrics() = {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriverMetrics()
......
......@@ -57,7 +57,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val webUi = new MasterWebUI(self, webUiPort)
Utils.checkHost(host, "Expected hostname")
val masterInstrumentation = new MasterInstrumentation(this)
val masterPublicAddress = {
......@@ -76,7 +76,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
Master.metricsSystem.registerSource(masterInstrumentation)
Master.metricsSystem.start()
}
......@@ -322,7 +322,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
removeWorker(worker)
}
}
override def postStop() {
Master.metricsSystem.stop()
}
......
......@@ -5,24 +5,21 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
private[spark] class MasterInstrumentation(val master: Master) extends Source {
val metricRegistry = new MetricRegistry()
val metricRegistry = new MetricRegistry()
val sourceName = "master"
metricRegistry.register(MetricRegistry.name("workers","number"),
new Gauge[Int] {
// Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
override def getValue: Int = master.workers.size
})
// Gauge for application numbers in cluster
metricRegistry.register(MetricRegistry.name("apps", "number"),
new Gauge[Int] {
override def getValue: Int = master.apps.size
metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
override def getValue: Int = master.apps.size
})
// Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name("waiting_apps", "number"),
new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
metricRegistry.register(MetricRegistry.name("waiting_apps", "number"), new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
})
}
......@@ -8,32 +8,27 @@ private[spark] class WorkerInstrumentation(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("executor", "number"),
new Gauge[Int] {
override def getValue: Int = worker.executors.size
metricRegistry.register(MetricRegistry.name("executor", "number"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})
// Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name("core_used", "number"),
new Gauge[Int] {
override def getValue: Int = worker.coresUsed
metricRegistry.register(MetricRegistry.name("core_used", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})
// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"),
new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
metricRegistry.register(MetricRegistry.name("mem_used", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
})
// Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name("core_free", "number"),
new Gauge[Int] {
override def getValue: Int = worker.coresFree
metricRegistry.register(MetricRegistry.name("core_free", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresFree
})
// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"),
new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
metricRegistry.register(MetricRegistry.name("mem_free", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
}
......@@ -69,7 +69,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
// We may have been called from a shutdown hook. If so, we must not call System.exit().
// (If we do, we will deadlock.)
if (!Utils.inShutdown()) {
......@@ -86,14 +86,14 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
}
)
val executorInstrumentation = new ExecutorInstrumentation(this)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)
SparkEnv.set(env)
env.metricsSystem.registerSource(executorInstrumentation)
private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool
......
......@@ -4,32 +4,27 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
class ExecutorInstrumentation(val executor: Executor) extends Source{
class ExecutorInstrumentation(val executor: Executor) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "executor"
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"),
new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
metricRegistry.register(MetricRegistry.name("threadpool", "active_task", "number"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
})
// Gauge for executor thread pool's approximate total number of tasks that have been completed
metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"),
new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
metricRegistry.register(MetricRegistry.name("threadpool", "complete_task", "count"), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
})
// Gauge for executor thread pool's current number of threads
metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"),
new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
metricRegistry.register(MetricRegistry.name("threadpool", "current_pool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
})
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"),
new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
}
\ No newline at end of file
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", "max_pool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
}
......@@ -3,63 +3,63 @@ package spark.metrics
import java.util.Properties
import java.io.{File, FileInputStream}
import scala.collection.mutable
import scala.collection.mutable.HashMap
import scala.util.matching.Regex
private [spark] class MetricsConfig(val configFile: String) {
private[spark] class MetricsConfig(val configFile: String) {
val properties = new Properties()
// Add default properties in case there's no properties file
MetricsConfig.setDefaultProperties(properties)
val confFile = new File(configFile)
if (confFile.exists()) {
var fis: FileInputStream = null
try {
fis = new FileInputStream(configFile)
properties.load(fis)
} finally {
fis.close()
}
}
val propertyCategories = MetricsConfig.subProperties(properties, MetricsConfig.INSTANCE_REGEX)
if (propertyCategories.contains(MetricsConfig.DEFAULT_PREFIX)) {
import scala.collection.JavaConversions._
val defaultProperty = propertyCategories(MetricsConfig.DEFAULT_PREFIX)
for ((inst, prop) <- propertyCategories; p <- defaultProperty
if inst != MetricsConfig.DEFAULT_PREFIX; if prop.getProperty(p._1) == null) {
prop.setProperty(p._1, p._2)
}
}
def getInstance(inst: String) = {
propertyCategories.get(inst) match {
case Some(s) => s
case None => propertyCategories(MetricsConfig.DEFAULT_PREFIX)
}
}
}
private[spark] object MetricsConfig {
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
def setDefaultProperties(prop: Properties) {
var propertyCategories: HashMap[String, Properties] = null
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.jmx.enabled", "default")
prop.setProperty("*.source.jvm.class", "spark.metrics.source.JvmSource")
}
def subProperties(prop: Properties, regex: Regex) = {
val subProperties = new mutable.HashMap[String, Properties]
def initilize() {
//Add default properties in case there's no properties file
setDefaultProperties(properties)
val confFile = new File(configFile)
if (confFile.exists()) {
var fis: FileInputStream = null
try {
fis = new FileInputStream(configFile)
properties.load(fis)
} finally {
fis.close()
}
}
propertyCategories = subProperties(properties, INSTANCE_REGEX)
if (propertyCategories.contains(DEFAULT_PREFIX)) {
import scala.collection.JavaConversions._
val defaultProperty = propertyCategories(DEFAULT_PREFIX)
for ((inst, prop) <- propertyCategories; p <- defaultProperty
if inst != DEFAULT_PREFIX; if prop.getProperty(p._1) == null) {
prop.setProperty(p._1, p._2)
}
}
}
def subProperties(prop: Properties, regex: Regex): HashMap[String, Properties] = {
val subProperties = new HashMap[String, Properties]
import scala.collection.JavaConversions._
prop.foreach { kv =>
prop.foreach { kv =>
if (regex.findPrefixOf(kv._1) != None) {
val regex(prefix, suffix) = kv._1
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
}
}
subProperties
}
}
\ No newline at end of file
def getInstance(inst: String): Properties = {
propertyCategories.get(inst) match {
case Some(s) => s
case None => propertyCategories(DEFAULT_PREFIX)
}
}
}
......@@ -13,34 +13,37 @@ import spark.metrics.source.Source
private[spark] class MetricsSystem private (val instance: String) extends Logging {
initLogging()
val confFile = System.getProperty("spark.metrics.conf.file", "unsupported")
val metricsConfig = new MetricsConfig(confFile)
val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source]
val registry = new MetricRegistry()
val DEFAULT_SINKS = Map("jmx" -> "spark.metrics.sink.JmxSink")
metricsConfig.initilize()
registerSources()
registerSinks()
def start() {
sinks.foreach(_.start)
}
def stop() {
sinks.foreach(_.stop)
}
def registerSource(source: Source) {
sources += source
registry.register(source.sourceName, source.metricRegistry)
}
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
......@@ -52,14 +55,14 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
}
}
def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = MetricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
val classPath = if (MetricsSystem.DEFAULT_SINKS.contains(kv._1)) {
MetricsSystem.DEFAULT_SINKS(kv._1)
val classPath = if (DEFAULT_SINKS.contains(kv._1)) {
DEFAULT_SINKS(kv._1)
} else {
// For non-default sink, a property class should be set and create using reflection
kv._2.getProperty("class")
......@@ -76,17 +79,14 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
private[spark] object MetricsSystem {
val DEFAULT_SINKS = Map("jmx" -> "spark.metrics.sink.JmxSink")
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
val timeUnits = Map(
"millisecond" -> TimeUnit.MILLISECONDS,
"illisecond" -> TimeUnit.MILLISECONDS,
"second" -> TimeUnit.SECONDS,
"minute" -> TimeUnit.MINUTES,
"hour" -> TimeUnit.HOURS,
"day" -> TimeUnit.DAYS)
def createMetricsSystem(instance: String) = new MetricsSystem(instance)
def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
}
......@@ -8,34 +8,34 @@ import java.util.concurrent.TimeUnit
import spark.metrics.MetricsSystem
class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val pollPeriod = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_PERIOD)) match {
val CONSOLE_DEFAULT_PERIOD = "10"
val CONSOLE_DEFAULT_UNIT = "second"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => ConsoleSink.CONSOLE_DEFAULT_PERIOD.toInt
case None => CONSOLE_DEFAULT_PERIOD.toInt
}
val pollUnit = Option(property.getProperty(ConsoleSink.CONSOLE_KEY_UNIT)) match {
val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => MetricsSystem.timeUnits(s)
case None => MetricsSystem.timeUnits(ConsoleSink.CONSOLE_DEFAULT_UNIT)
case None => MetricsSystem.timeUnits(CONSOLE_DEFAULT_UNIT)
}
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
}
object ConsoleSink {
val CONSOLE_DEFAULT_PERIOD = "10"
val CONSOLE_DEFAULT_UNIT = "second"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
}
......@@ -9,21 +9,29 @@ import java.util.concurrent.TimeUnit
import spark.metrics.MetricsSystem
class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val pollPeriod = Option(property.getProperty(CsvSink.CSV_KEY_PERIOD)) match {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
val CSV_DEFAULT_PERIOD = "10"
val CSV_DEFAULT_UNIT = "second"
val CSV_DEFAULT_DIR = "/tmp/"
val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CsvSink.CSV_DEFAULT_PERIOD.toInt
case None => CSV_DEFAULT_PERIOD.toInt
}
val pollUnit = Option(property.getProperty(CsvSink.CSV_KEY_UNIT)) match {
val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
case Some(s) => MetricsSystem.timeUnits(s)
case None => MetricsSystem.timeUnits(CsvSink.CSV_DEFAULT_UNIT)
case None => MetricsSystem.timeUnits(CSV_DEFAULT_UNIT)
}
val pollDir = Option(property.getProperty(CsvSink.CSV_KEY_DIR)) match {
val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
case Some(s) => s
case None => CsvSink.CSV_DEFAULT_DIR
case None => CSV_DEFAULT_DIR
}
val reporter: CsvReporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
......@@ -31,21 +39,11 @@ class CsvSink(val property: Properties, val registry: MetricRegistry) extends Si
.build(new File(pollDir))
override def start() {
reporter.start(pollPeriod, pollUnit)
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
}
object CsvSink {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
val CSV_DEFAULT_PERIOD = "10"
val CSV_DEFAULT_UNIT = "second"
val CSV_DEFAULT_DIR = "/tmp/"
}
......@@ -5,27 +5,26 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
val metricRegistry = new MetricRegistry()
val metricRegistry = new MetricRegistry()
val sourceName = "DAGScheduler"
metricRegistry.register(MetricRegistry.name("stage","failedStage"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
metricRegistry.register(MetricRegistry.name("stage", "failedStage"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
})
metricRegistry.register(MetricRegistry.name("stage","runningStage"), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size
metricRegistry.register(MetricRegistry.name("stage", "runningStage"), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size
})
metricRegistry.register(MetricRegistry.name("stage","waitingStage"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size
metricRegistry.register(MetricRegistry.name("stage", "waitingStage"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size
})
metricRegistry.register(MetricRegistry.name("job","allJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextRunId.get()
metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextRunId.get()
})
metricRegistry.register(MetricRegistry.name("job","ActiveJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
metricRegistry.register(MetricRegistry.name("job", "ActiveJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
}
......@@ -6,29 +6,29 @@ import spark.metrics.source.Source
import spark.storage._
private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
val metricRegistry = new MetricRegistry()
val metricRegistry = new MetricRegistry()
val sourceName = "BlockManager"
metricRegistry.register(MetricRegistry.name("memory","maxMem"), new Gauge[Long] {
metricRegistry.register(MetricRegistry.name("memory", "maxMem"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
maxMem
}
})
metricRegistry.register(MetricRegistry.name("memory","remainingMem"), new Gauge[Long] {
metricRegistry.register(MetricRegistry.name("memory", "remainingMem"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
remainingMem
}
})
metricRegistry.register(MetricRegistry.name("disk","diskSpaceUsed"), new Gauge[Long] {
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).reduceOption(_+_).getOrElse(0L)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).reduceOption(_+_).getOrElse(0L)
diskSpaceUsed
}
})
......
......@@ -9,60 +9,64 @@ import spark.metrics._
class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
before {
filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile()
}
test("MetricsConfig with default properties") {
val conf = new MetricsConfig("dummy-file")
conf.initilize()
assert(conf.properties.size() === 2)
assert(conf.properties.getProperty("*.sink.jmx.enabled") === "default")
assert(conf.properties.getProperty("*.source.jvm.class") === "spark.metrics.source.JvmSource")
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
assert(property.size() === 2)
assert(property.getProperty("sink.jmx.enabled") === "default")
assert(property.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
}
test("MetricsConfig with properties set") {
val conf = new MetricsConfig(filePath)
conf.initilize()
val masterProp = conf.getInstance("master")
assert(masterProp.size() === 4)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minute")
assert(masterProp.getProperty("sink.jmx.enabled") === "default")
assert(masterProp.getProperty("source.jvm.class") == "spark.metrics.source.JvmSource")
val workerProp = conf.getInstance("worker")
assert(workerProp.size() === 4)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "second")
}
test("MetricsConfig with subProperties") {
val conf = new MetricsConfig(filePath)
conf.initilize()
val propCategories = conf.propertyCategories
assert(propCategories.size === 2)
val masterProp = conf.getInstance("master")
val sourceProps = MetricsConfig.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
assert(sourceProps.size === 1)
assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
val sinkProps = MetricsConfig.subProperties(masterProp, MetricsSystem.SINK_REGEX)
val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
assert(sinkProps.size === 2)
assert(sinkProps.contains("console"))
assert(sinkProps.contains("jmx"))
val consoleProps = sinkProps("console")
assert(consoleProps.size() === 2)
val jmxProps = sinkProps("jmx")
assert(jmxProps.size() === 1)
}
}
}
......@@ -9,32 +9,32 @@ import spark.metrics._
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
before {
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
System.setProperty("spark.metrics.conf.file", filePath)
}
test("MetricsSystem with default config") {
val metricsSystem = MetricsSystem.createMetricsSystem("default")
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
assert(sources.length === 1)
assert(sinks.length === 1)
assert(sources(0).sourceName === "jvm")
}
test("MetricsSystem with sources add") {
val metricsSystem = MetricsSystem.createMetricsSystem("test")
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
assert(sources.length === 1)
assert(sinks.length === 2)
val source = new spark.deploy.master.MasterInstrumentation(null)
metricsSystem.registerSource(source)
assert(sources.length === 2)
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment