Skip to content
Snippets Groups Projects
Commit cd5d0f33 authored by Xiaofeng Lin's avatar Xiaofeng Lin Committed by jerryshao
Browse files

[SPARK-11574][CORE] Add metrics StatsD sink

This patch adds statsd sink to the current metrics system in spark core.

Author: Xiaofeng Lin <xlin@twilio.com>

Closes #9518 from xflin/statsd.

Change-Id: Ib8720e86223d4a650df53f51ceb963cd95b49a44
parent 313c6ca4
No related branches found
No related tags found
No related merge requests found
......@@ -118,6 +118,14 @@
# prefix EMPTY STRING Prefix to prepend to every metric's name
# protocol tcp Protocol ("tcp" or "udp") to use
# org.apache.spark.metrics.sink.StatsdSink
# Name: Default: Description:
# host 127.0.0.1 Hostname or IP of StatsD server
# port 8125 Port of StatsD server
# period 10 Poll period
# unit seconds Units of poll period
# prefix EMPTY STRING Prefix to prepend to metric name
## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
......@@ -125,6 +133,10 @@
# Enable ConsoleSink for all instances by class name
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
# Enable StatsdSink for all instances by class name
#*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
#*.sink.statsd.prefix=spark
# Polling period for the ConsoleSink
#*.sink.console.period=10
# Unit of the polling period for the ConsoleSink
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.metrics.sink
import java.io.IOException
import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.SortedMap
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import com.codahale.metrics._
import org.apache.hadoop.net.NetUtils
import org.apache.spark.internal.Logging
/**
* @see <a href="https://github.com/etsy/statsd/blob/master/docs/metric_types.md">
* StatsD metric types</a>
*/
private[spark] object StatsdMetricType {
val COUNTER = "c"
val GAUGE = "g"
val TIMER = "ms"
val Set = "s"
}
private[spark] class StatsdReporter(
registry: MetricRegistry,
host: String = "127.0.0.1",
port: Int = 8125,
prefix: String = "",
filter: MetricFilter = MetricFilter.ALL,
rateUnit: TimeUnit = TimeUnit.SECONDS,
durationUnit: TimeUnit = TimeUnit.MILLISECONDS)
extends ScheduledReporter(registry, "statsd-reporter", filter, rateUnit, durationUnit)
with Logging {
import StatsdMetricType._
private val address = new InetSocketAddress(host, port)
private val whitespace = "[\\s]+".r
override def report(
gauges: SortedMap[String, Gauge[_]],
counters: SortedMap[String, Counter],
histograms: SortedMap[String, Histogram],
meters: SortedMap[String, Meter],
timers: SortedMap[String, Timer]): Unit =
Try(new DatagramSocket) match {
case Failure(ioe: IOException) => logWarning("StatsD datagram socket construction failed",
NetUtils.wrapException(host, port, NetUtils.getHostname(), 0, ioe))
case Failure(e) => logWarning("StatsD datagram socket construction failed", e)
case Success(s) =>
implicit val socket = s
val localAddress = Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null)
val localPort = socket.getLocalPort
Try {
gauges.entrySet.asScala.foreach(e => reportGauge(e.getKey, e.getValue))
counters.entrySet.asScala.foreach(e => reportCounter(e.getKey, e.getValue))
histograms.entrySet.asScala.foreach(e => reportHistogram(e.getKey, e.getValue))
meters.entrySet.asScala.foreach(e => reportMetered(e.getKey, e.getValue))
timers.entrySet.asScala.foreach(e => reportTimer(e.getKey, e.getValue))
} recover {
case ioe: IOException =>
logDebug(s"Unable to send packets to StatsD", NetUtils.wrapException(
address.getHostString, address.getPort, localAddress, localPort, ioe))
case e: Throwable => logDebug(s"Unable to send packets to StatsD at '$host:$port'", e)
}
Try(socket.close()) recover {
case ioe: IOException =>
logDebug("Error when close socket to StatsD", NetUtils.wrapException(
address.getHostString, address.getPort, localAddress, localPort, ioe))
case e: Throwable => logDebug("Error when close socket to StatsD", e)
}
}
private def reportGauge(name: String, gauge: Gauge[_])(implicit socket: DatagramSocket): Unit =
formatAny(gauge.getValue).foreach(v => send(fullName(name), v, GAUGE))
private def reportCounter(name: String, counter: Counter)(implicit socket: DatagramSocket): Unit =
send(fullName(name), format(counter.getCount), COUNTER)
private def reportHistogram(name: String, histogram: Histogram)
(implicit socket: DatagramSocket): Unit = {
val snapshot = histogram.getSnapshot
send(fullName(name, "count"), format(histogram.getCount), GAUGE)
send(fullName(name, "max"), format(snapshot.getMax), TIMER)
send(fullName(name, "mean"), format(snapshot.getMean), TIMER)
send(fullName(name, "min"), format(snapshot.getMin), TIMER)
send(fullName(name, "stddev"), format(snapshot.getStdDev), TIMER)
send(fullName(name, "p50"), format(snapshot.getMedian), TIMER)
send(fullName(name, "p75"), format(snapshot.get75thPercentile), TIMER)
send(fullName(name, "p95"), format(snapshot.get95thPercentile), TIMER)
send(fullName(name, "p98"), format(snapshot.get98thPercentile), TIMER)
send(fullName(name, "p99"), format(snapshot.get99thPercentile), TIMER)
send(fullName(name, "p999"), format(snapshot.get999thPercentile), TIMER)
}
private def reportMetered(name: String, meter: Metered)(implicit socket: DatagramSocket): Unit = {
send(fullName(name, "count"), format(meter.getCount), GAUGE)
send(fullName(name, "m1_rate"), format(convertRate(meter.getOneMinuteRate)), TIMER)
send(fullName(name, "m5_rate"), format(convertRate(meter.getFiveMinuteRate)), TIMER)
send(fullName(name, "m15_rate"), format(convertRate(meter.getFifteenMinuteRate)), TIMER)
send(fullName(name, "mean_rate"), format(convertRate(meter.getMeanRate)), TIMER)
}
private def reportTimer(name: String, timer: Timer)(implicit socket: DatagramSocket): Unit = {
val snapshot = timer.getSnapshot
send(fullName(name, "max"), format(convertDuration(snapshot.getMax)), TIMER)
send(fullName(name, "mean"), format(convertDuration(snapshot.getMean)), TIMER)
send(fullName(name, "min"), format(convertDuration(snapshot.getMin)), TIMER)
send(fullName(name, "stddev"), format(convertDuration(snapshot.getStdDev)), TIMER)
send(fullName(name, "p50"), format(convertDuration(snapshot.getMedian)), TIMER)
send(fullName(name, "p75"), format(convertDuration(snapshot.get75thPercentile)), TIMER)
send(fullName(name, "p95"), format(convertDuration(snapshot.get95thPercentile)), TIMER)
send(fullName(name, "p98"), format(convertDuration(snapshot.get98thPercentile)), TIMER)
send(fullName(name, "p99"), format(convertDuration(snapshot.get99thPercentile)), TIMER)
send(fullName(name, "p999"), format(convertDuration(snapshot.get999thPercentile)), TIMER)
reportMetered(name, timer)
}
private def send(name: String, value: String, metricType: String)
(implicit socket: DatagramSocket): Unit = {
val bytes = sanitize(s"$name:$value|$metricType").getBytes(UTF_8)
val packet = new DatagramPacket(bytes, bytes.length, address)
socket.send(packet)
}
private def fullName(names: String*): String = MetricRegistry.name(prefix, names : _*)
private def sanitize(s: String): String = whitespace.replaceAllIn(s, "-")
private def format(v: Any): String = formatAny(v).getOrElse("")
private def formatAny(v: Any): Option[String] =
v match {
case f: Float => Some("%2.2f".format(f))
case d: Double => Some("%2.2f".format(d))
case b: BigDecimal => Some("%2.2f".format(b))
case n: Number => Some(v.toString)
case _ => None
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.metrics.sink
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import org.apache.spark.SecurityManager
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
private[spark] object StatsdSink {
val STATSD_KEY_HOST = "host"
val STATSD_KEY_PORT = "port"
val STATSD_KEY_PERIOD = "period"
val STATSD_KEY_UNIT = "unit"
val STATSD_KEY_PREFIX = "prefix"
val STATSD_DEFAULT_HOST = "127.0.0.1"
val STATSD_DEFAULT_PORT = "8125"
val STATSD_DEFAULT_PERIOD = "10"
val STATSD_DEFAULT_UNIT = "SECONDS"
val STATSD_DEFAULT_PREFIX = ""
}
private[spark] class StatsdSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink with Logging {
import StatsdSink._
val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST)
val port = property.getProperty(STATSD_KEY_PORT, STATSD_DEFAULT_PORT).toInt
val pollPeriod = property.getProperty(STATSD_KEY_PERIOD, STATSD_DEFAULT_PERIOD).toInt
val pollUnit =
TimeUnit.valueOf(property.getProperty(STATSD_KEY_UNIT, STATSD_DEFAULT_UNIT).toUpperCase)
val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX)
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val reporter = new StatsdReporter(registry, host, port, prefix)
override def start(): Unit = {
reporter.start(pollPeriod, pollUnit)
logInfo(s"StatsdSink started with prefix: '$prefix'")
}
override def stop(): Unit = {
reporter.stop()
logInfo("StatsdSink stopped.")
}
override def report(): Unit = reporter.report()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.metrics.sink
import java.net.{DatagramPacket, DatagramSocket}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Properties
import java.util.concurrent.TimeUnit._
import com.codahale.metrics._
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.metrics.sink.StatsdSink._
class StatsdSinkSuite extends SparkFunSuite {
private val securityMgr = new SecurityManager(new SparkConf(false))
private val defaultProps = Map(
STATSD_KEY_PREFIX -> "spark",
STATSD_KEY_PERIOD -> "1",
STATSD_KEY_UNIT -> "seconds",
STATSD_KEY_HOST -> "127.0.0.1"
)
private val socketTimeout = 30000 // milliseconds
private val socketBufferSize = 8192
private def withSocketAndSink(testCode: (DatagramSocket, StatsdSink) => Any): Unit = {
val socket = new DatagramSocket
socket.setReceiveBufferSize(socketBufferSize)
socket.setSoTimeout(socketTimeout)
val props = new Properties
defaultProps.foreach(e => props.put(e._1, e._2))
props.put(STATSD_KEY_PORT, socket.getLocalPort.toString)
val registry = new MetricRegistry
val sink = new StatsdSink(props, registry, securityMgr)
try {
testCode(socket, sink)
} finally {
socket.close()
}
}
test("metrics StatsD sink with Counter") {
withSocketAndSink { (socket, sink) =>
val counter = new Counter
counter.inc(12)
sink.registry.register("counter", counter)
sink.report()
val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
socket.receive(p)
val result = new String(p.getData, 0, p.getLength, UTF_8)
assert(result === "spark.counter:12|c", "Counter metric received should match data sent")
}
}
test("metrics StatsD sink with Gauge") {
withSocketAndSink { (socket, sink) =>
val gauge = new Gauge[Double] {
override def getValue: Double = 1.23
}
sink.registry.register("gauge", gauge)
sink.report()
val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
socket.receive(p)
val result = new String(p.getData, 0, p.getLength, UTF_8)
assert(result === "spark.gauge:1.23|g", "Gauge metric received should match data sent")
}
}
test("metrics StatsD sink with Histogram") {
withSocketAndSink { (socket, sink) =>
val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
val histogram = new Histogram(new UniformReservoir)
histogram.update(10)
histogram.update(20)
histogram.update(30)
sink.registry.register("histogram", histogram)
sink.report()
val expectedResults = Set(
"spark.histogram.count:3|g",
"spark.histogram.max:30|ms",
"spark.histogram.mean:20.00|ms",
"spark.histogram.min:10|ms",
"spark.histogram.stddev:10.00|ms",
"spark.histogram.p50:20.00|ms",
"spark.histogram.p75:30.00|ms",
"spark.histogram.p95:30.00|ms",
"spark.histogram.p98:30.00|ms",
"spark.histogram.p99:30.00|ms",
"spark.histogram.p999:30.00|ms"
)
(1 to expectedResults.size).foreach { i =>
socket.receive(p)
val result = new String(p.getData, 0, p.getLength, UTF_8)
logInfo(s"Received histogram result $i: '$result'")
assert(expectedResults.contains(result),
"Histogram metric received should match data sent")
}
}
}
test("metrics StatsD sink with Timer") {
withSocketAndSink { (socket, sink) =>
val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
val timer = new Timer()
timer.update(1, SECONDS)
timer.update(2, SECONDS)
timer.update(3, SECONDS)
sink.registry.register("timer", timer)
sink.report()
val expectedResults = Set(
"spark.timer.max:3000.00|ms",
"spark.timer.mean:2000.00|ms",
"spark.timer.min:1000.00|ms",
"spark.timer.stddev:816.50|ms",
"spark.timer.p50:2000.00|ms",
"spark.timer.p75:3000.00|ms",
"spark.timer.p95:3000.00|ms",
"spark.timer.p98:3000.00|ms",
"spark.timer.p99:3000.00|ms",
"spark.timer.p999:3000.00|ms",
"spark.timer.count:3|g",
"spark.timer.m1_rate:0.00|ms",
"spark.timer.m5_rate:0.00|ms",
"spark.timer.m15_rate:0.00|ms"
)
// mean rate varies on each test run
val oneMoreResult = """spark.timer.mean_rate:\d+\.\d\d\|ms"""
(1 to (expectedResults.size + 1)).foreach { i =>
socket.receive(p)
val result = new String(p.getData, 0, p.getLength, UTF_8)
logInfo(s"Received timer result $i: '$result'")
assert(expectedResults.contains(result) || result.matches(oneMoreResult),
"Timer metric received should match data sent")
}
}
}
}
......@@ -455,6 +455,7 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the
* `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
* `GraphiteSink`: Sends metrics to a Graphite node.
* `Slf4jSink`: Sends metrics to slf4j as log entries.
* `StatsdSink`: Sends metrics to a StatsD node.
Spark also supports a Ganglia sink which is not included in the default build due to
licensing restrictions:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment