diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 9da610e359f90d1b3ac8ec7f46f59ef7ef6fdbd5..182fc2779e7ab02cab0db1c2003f4395d996a685 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -92,6 +92,10 @@ trait StateStore { */ def abort(): Unit + /** + * Return an iterator containing all the key-value pairs in the SateStore. Implementations must + * ensure that updates (puts, removes) can be made while iterating over this iterator. + */ def iterator(): Iterator[UnsafeRowPair] /** Current metrics of the state store */ @@ -120,7 +124,12 @@ case class StateStoreMetrics( * Name and description of custom implementation-specific metrics that a * state store may wish to expose. */ -case class StateStoreCustomMetric(name: String, desc: String) +trait StateStoreCustomMetric { + def name: String + def desc: String +} +case class StateStoreCustomSizeMetric(name: String, desc: String) extends StateStoreCustomMetric +case class StateStoreCustomTimingMetric(name: String, desc: String) extends StateStoreCustomMetric /** * Trait representing a provider that provide [[StateStore]] instances representing diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 77b1160a063fbc23f3cdd4e57b0a187b6208a341..3ca7f4b145d719a3a12e4d60c714086f42978cc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -104,7 +104,6 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => * This should be called in that task after the store has been updated. */ protected def setStoreMetrics(store: StateStore): Unit = { - val storeMetrics = store.metrics longMetric("numTotalStateRows") += storeMetrics.numKeys longMetric("stateMemory") += storeMetrics.memoryUsedBytes @@ -115,8 +114,12 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => private def stateStoreCustomMetrics: Map[String, SQLMetric] = { val provider = StateStoreProvider.create(sqlContext.conf.stateStoreProviderClass) - provider.supportedCustomMetrics.map { m => - m.name -> SQLMetrics.createTimingMetric(sparkContext, m.desc) }.toMap + provider.supportedCustomMetrics.map { + case StateStoreCustomSizeMetric(name, desc) => + name -> SQLMetrics.createSizeMetric(sparkContext, desc) + case StateStoreCustomTimingMetric(name, desc) => + name -> SQLMetrics.createTimingMetric(sparkContext, desc) + }.toMap } }