-
- Downloads
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming
## What changes were proposed in this pull request? Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics. https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing Specifically, this PR adds the following public APIs changes. ### New APIs - `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later) - `StreamingQueryStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by all the sources - processingRate - Current rate (rows/sec) at which the query is processing data from all the sources - ~~outputRate~~ - *Does not work with wholestage codegen* - latency - Current average latency between the data being available in source and the sink writing the corresponding output - sourceStatuses: Array[SourceStatus] - Current statuses of the sources - sinkStatus: SinkStatus - Current status of the sink - triggerStatus - Low-level detailed status of the last completed/currently active trigger - latencies - getOffset, getBatch, full trigger, wal writes - timestamps - trigger start, finish, after getOffset, after getBatch - numRows - input, output, state total/updated rows for aggregations - `SourceStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by the source - processingRate - Current rate (rows/sec) at which the query is processing data from the source - triggerStatus - Low-level detailed status of the last completed/currently active trigger - Python API for `StreamingQuery.status()` ### Breaking changes to existing APIs **Existing direct public facing APIs** - Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`. - Branch 2.0 should have it deprecated, master should have it removed. **Existing advanced listener APIs** - `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus` - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status) - Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`. - Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`. - For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java. ## How was this patch tested? Old and new unit tests. - Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite. - New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite. - New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite. - Source-specific tests for making sure input rows are counted are is source-specific test suites. - Additional tests to test minor additions in LocalTableScanExec, StateStore, etc. Metrics also manually tested using Ganglia sink Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15307 from tdas/SPARK-17731.
Showing
- external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala 27 additions, 0 deletions...cala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
- project/MimaExcludes.scala 13 additions, 0 deletionsproject/MimaExcludes.scala
- python/pyspark/sql/streaming.py 301 additions, 0 deletionspython/pyspark/sql/streaming.py
- sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala 7 additions, 0 deletions.../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala 4 additions, 1 deletion...a/org/apache/spark/sql/execution/LocalTableScanExec.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala 30 additions, 1 deletion...che/spark/sql/execution/streaming/StatefulAggregate.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala 236 additions, 71 deletions...pache/spark/sql/execution/streaming/StreamExecution.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala 242 additions, 0 deletions.../apache/spark/sql/execution/streaming/StreamMetrics.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 7 additions, 0 deletions...ala/org/apache/spark/sql/execution/streaming/memory.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala 2 additions, 0 deletions...cution/streaming/state/HDFSBackedStateStoreProvider.scala
- sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala 3 additions, 0 deletions...ache/spark/sql/execution/streaming/state/StateStore.scala
- sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 8 additions, 0 deletions...rc/main/scala/org/apache/spark/sql/internal/SQLConf.scala
- sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala 22 additions, 6 deletions...ain/scala/org/apache/spark/sql/streaming/SinkStatus.scala
- sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala 48 additions, 6 deletions...n/scala/org/apache/spark/sql/streaming/SourceStatus.scala
- sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala 12 additions, 1 deletion...scala/org/apache/spark/sql/streaming/StreamingQuery.scala
- sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryInfo.scala 0 additions, 37 deletions...a/org/apache/spark/sql/streaming/StreamingQueryInfo.scala
- sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala 4 additions, 4 deletions...g/apache/spark/sql/streaming/StreamingQueryListener.scala
- sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala 139 additions, 0 deletions...org/apache/spark/sql/streaming/StreamingQueryStatus.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala 17 additions, 0 deletions...g/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
- sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala 213 additions, 0 deletions...he/spark/sql/execution/streaming/StreamMetricsSuite.scala
Loading
Please register or sign in to comment