Skip to content
Snippets Groups Projects
Commit 18c4fceb authored by Marcelo Vanzin's avatar Marcelo Vanzin Committed by Andrew Or
Browse files

[SPARK-7169] [CORE] Allow metrics system to be configured through SparkConf.

Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Jacek Lewandowski <lewandowski.jacek@gmail.com>

Closes #6560 from vanzin/SPARK-7169 and squashes the following commits:

737266f [Marcelo Vanzin] Feedback.
702d5a3 [Marcelo Vanzin] Scalastyle.
ce66e7e [Marcelo Vanzin] Remove metrics config handling from SparkConf.
439938a [Jacek Lewandowski] SPARK-7169: Metrics can be additionally configured from Spark configuration
parent 5aa804f3
No related branches found
No related tags found
No related merge requests found
......@@ -23,10 +23,10 @@ import java.util.Properties
import scala.collection.mutable
import scala.util.matching.Regex
import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf}
private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
private val DEFAULT_PREFIX = "*"
private val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
......@@ -46,23 +46,14 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
// Add default properties in case there's no properties file
setDefaultProperties(properties)
// If spark.metrics.conf is not set, try to get file in class path
val isOpt: Option[InputStream] = configFile.map(new FileInputStream(_)).orElse {
try {
Option(Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME))
} catch {
case e: Exception =>
logError("Error loading default configuration file", e)
None
}
}
loadPropertiesFromFile(conf.getOption("spark.metrics.conf"))
isOpt.foreach { is =>
try {
properties.load(is)
} finally {
is.close()
}
// Also look for the properties in provided Spark configuration
val prefix = "spark.metrics.conf."
conf.getAll.foreach {
case (k, v) if k.startsWith(prefix) =>
properties.setProperty(k.substring(prefix.length()), v)
case _ =>
}
propertyCategories = subProperties(properties, INSTANCE_REGEX)
......@@ -97,5 +88,31 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
}
/**
* Loads configuration from a config file. If no config file is provided, try to get file
* in class path.
*/
private[this] def loadPropertiesFromFile(path: Option[String]): Unit = {
var is: InputStream = null
try {
is = path match {
case Some(f) => new FileInputStream(f)
case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
}
if (is != null) {
properties.load(is)
}
} catch {
case e: Exception =>
val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
logError(s"Error loading configuration file $file", e)
} finally {
if (is != null) {
is.close()
}
}
}
}
......@@ -70,8 +70,7 @@ private[spark] class MetricsSystem private (
securityMgr: SecurityManager)
extends Logging {
private[this] val confFile = conf.get("spark.metrics.conf", null)
private[this] val metricsConfig = new MetricsConfig(Option(confFile))
private[this] val metricsConfig = new MetricsConfig(conf)
private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
......
......@@ -17,6 +17,8 @@
package org.apache.spark.metrics
import org.apache.spark.SparkConf
import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkFunSuite
......@@ -29,7 +31,9 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
}
test("MetricsConfig with default properties") {
val conf = new MetricsConfig(None)
val sparkConf = new SparkConf(loadDefaults = false)
sparkConf.set("spark.metrics.conf", "dummy-file")
val conf = new MetricsConfig(sparkConf)
conf.initialize()
assert(conf.properties.size() === 4)
......@@ -42,8 +46,41 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
assert(property.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with properties set") {
val conf = new MetricsConfig(Option(filePath))
test("MetricsConfig with properties set from a file") {
val sparkConf = new SparkConf(loadDefaults = false)
sparkConf.set("spark.metrics.conf", filePath)
val conf = new MetricsConfig(sparkConf)
conf.initialize()
val masterProp = conf.getInstance("master")
assert(masterProp.size() === 5)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") ===
"org.apache.spark.metrics.source.JvmSource")
assert(masterProp.getProperty("sink.servlet.class") ===
"org.apache.spark.metrics.sink.MetricsServlet")
assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
val workerProp = conf.getInstance("worker")
assert(workerProp.size() === 5)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(workerProp.getProperty("source.jvm.class") ===
"org.apache.spark.metrics.source.JvmSource")
assert(workerProp.getProperty("sink.servlet.class") ===
"org.apache.spark.metrics.sink.MetricsServlet")
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with properties set from a Spark configuration") {
val sparkConf = new SparkConf(loadDefaults = false)
setMetricsProperty(sparkConf, "*.sink.console.period", "10")
setMetricsProperty(sparkConf, "*.sink.console.unit", "seconds")
setMetricsProperty(sparkConf, "*.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
setMetricsProperty(sparkConf, "master.sink.console.period", "20")
setMetricsProperty(sparkConf, "master.sink.console.unit", "minutes")
val conf = new MetricsConfig(sparkConf)
conf.initialize()
val masterProp = conf.getInstance("master")
......@@ -67,8 +104,40 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with properties set from a file and a Spark configuration") {
val sparkConf = new SparkConf(loadDefaults = false)
setMetricsProperty(sparkConf, "*.sink.console.period", "10")
setMetricsProperty(sparkConf, "*.sink.console.unit", "seconds")
setMetricsProperty(sparkConf, "*.source.jvm.class", "org.apache.spark.SomeOtherSource")
setMetricsProperty(sparkConf, "master.sink.console.period", "50")
setMetricsProperty(sparkConf, "master.sink.console.unit", "seconds")
sparkConf.set("spark.metrics.conf", filePath)
val conf = new MetricsConfig(sparkConf)
conf.initialize()
val masterProp = conf.getInstance("master")
assert(masterProp.size() === 5)
assert(masterProp.getProperty("sink.console.period") === "50")
assert(masterProp.getProperty("sink.console.unit") === "seconds")
assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.SomeOtherSource")
assert(masterProp.getProperty("sink.servlet.class") ===
"org.apache.spark.metrics.sink.MetricsServlet")
assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
val workerProp = conf.getInstance("worker")
assert(workerProp.size() === 5)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.SomeOtherSource")
assert(workerProp.getProperty("sink.servlet.class") ===
"org.apache.spark.metrics.sink.MetricsServlet")
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with subProperties") {
val conf = new MetricsConfig(Option(filePath))
val sparkConf = new SparkConf(loadDefaults = false)
sparkConf.set("spark.metrics.conf", filePath)
val conf = new MetricsConfig(sparkConf)
conf.initialize()
val propCategories = conf.propertyCategories
......@@ -90,4 +159,9 @@ class MetricsConfigSuite extends SparkFunSuite with BeforeAndAfter {
val servletProps = sinkProps("servlet")
assert(servletProps.size() === 2)
}
private def setMetricsProperty(conf: SparkConf, name: String, value: String): Unit = {
conf.set(s"spark.metrics.conf.$name", value)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment