Skip to content
Snippets Groups Projects
Commit e9faae13 authored by Tathagata Das's avatar Tathagata Das
Browse files

[SPARK-21409][SS] Follow up PR to allow different types of custom metrics to be exposed

## What changes were proposed in this pull request?

Implementation may expose both timing as well as size metrics. This PR enables that.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #18661 from tdas/SPARK-21409-2.
parent 7aac755b
No related branches found
No related tags found
No related merge requests found
......@@ -92,6 +92,10 @@ trait StateStore {
*/
def abort(): Unit
/**
* Return an iterator containing all the key-value pairs in the SateStore. Implementations must
* ensure that updates (puts, removes) can be made while iterating over this iterator.
*/
def iterator(): Iterator[UnsafeRowPair]
/** Current metrics of the state store */
......@@ -120,7 +124,12 @@ case class StateStoreMetrics(
* Name and description of custom implementation-specific metrics that a
* state store may wish to expose.
*/
case class StateStoreCustomMetric(name: String, desc: String)
trait StateStoreCustomMetric {
def name: String
def desc: String
}
case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric
case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric
/**
* Trait representing a provider that provide [[StateStore]] instances representing
......
......@@ -104,7 +104,6 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
* This should be called in that task after the store has been updated.
*/
protected def setStoreMetrics(store: StateStore): Unit = {
val storeMetrics = store.metrics
longMetric("numTotalStateRows") += storeMetrics.numKeys
longMetric("stateMemory") += storeMetrics.memoryUsedBytes
......@@ -115,8 +114,12 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
private def stateStoreCustomMetrics: Map[String, SQLMetric] = {
val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass)
provider.supportedCustomMetrics.map { m =>
m.name -> SQLMetrics.createTimingMetric(sparkContext, m.desc) }.toMap
provider.supportedCustomMetrics.map {
case StateStoreCustomSizeMetric(name, desc) =>
name -> SQLMetrics.createSizeMetric(sparkContext, desc)
case StateStoreCustomTimingMetric(name, desc) =>
name -> SQLMetrics.createTimingMetric(sparkContext, desc)
}.toMap
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment