Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17731][SQL][STREAMING] Metrics for structured streaming for branch-2.0 #15472

Closed
wants to merge 2 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Oct 13, 2016

This PR adds the same metrics to branch-2.0 that was added to master in #15307.

The differences compared to the #15307 are

  • The new configuration is added only in the SQLConfobject (i.e. SQLConf.STREAMING_METRICS_ENABLED) and not in the SQLConf class (i.e. no SQLConf.isStreamingMetricsEnabled). Spark master has all the streaming configurations exposed as actual fields in SQLConf class (e.g. streamingPollingDelay), but not in Spark 2.0. So I didnt add it in this 2.0 PR.
  • In the previous master PR, the aboveconfiguration was read in StreamExecution as sparkSession.sessionState.conf.isStreamingMetricsEnabled. In this 2.0 PR, I am instead reading it as sparkSession.conf.get(STREAMING_METRICS_ENABLED)(i.e. no sessionState) to keep it consistent with how other confs are read in StreamExecution (e.g. STREAMING_POLLING_DELAY).
  • Different Mima exclusions

What changes were proposed in this pull request?

Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing

Specifically, this PR adds the following public APIs changes.

  • StreamingQuery.status returns a StreamingQueryStatus object (renamed from StreamingQueryInfo, see later)
  • StreamingQueryStatus has the following important fields
    • inputRate - Current rate (rows/sec) at which data is being generated by all the sources
    • processingRate - Current rate (rows/sec) at which the query is processing data from
      all the sources
    • outputRate - Does not work with wholestage codegen
    • latency - Current average latency between the data being available in source and the sink writing the corresponding output
    • sourceStatuses: Array[SourceStatus] - Current statuses of the sources
    • sinkStatus: SinkStatus - Current status of the sink
    • triggerStatus - Low-level detailed status of the last completed/currently active trigger
      • latencies - getOffset, getBatch, full trigger, wal writes
      • timestamps - trigger start, finish, after getOffset, after getBatch
      • numRows - input, output, state total/updated rows for aggregations
  • SourceStatus has the following important fields
    • inputRate - Current rate (rows/sec) at which data is being generated by the source
    • processingRate - Current rate (rows/sec) at which the query is processing data from the source
    • triggerStatus - Low-level detailed status of the last completed/currently active trigger
  • Python API for StreamingQuery.status()

Breaking changes to existing APIs

Existing direct public facing APIs

  • Deprecated direct public-facing APIs StreamingQuery.sourceStatuses and StreamingQuery.sinkStatus in favour of StreamingQuery.status.sourceStatuses/sinkStatus.
    • Branch 2.0 should have it deprecated, master should have it removed.

Existing advanced listener APIs

  • StreamingQueryInfo renamed to StreamingQueryStatus for consistency with SourceStatus, SinkStatus
    • Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status)
  • Field queryInfo in listener events QueryStarted, QueryProgress, QueryTerminated changed have name queryStatus and return type StreamingQueryStatus.
  • Field offsetDesc in SourceStatus was Option[String], converted it to String.
  • For SourceStatus and SinkStatus made constructor private instead of private[sql] to make them more java-safe. Instead added private[sql] object SourceStatus/SinkStatus.apply() which are harder to accidentally use in Java.

How was this patch tested?

Old and new unit tests.

  • Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite.
  • New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite.
  • New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite.
  • Source-specific tests for making sure input rows are counted are is source-specific test suites.
  • Additional tests to test minor additions in LocalTableScanExec, StateStore, etc.

Metrics also manually tested using Ganglia sink

Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing

Specifically, this PR adds the following public APIs changes.

- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later)

- `StreamingQueryStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by all the sources
  - processingRate - Current rate (rows/sec) at which the query is processing data from
                                  all the sources
  - ~~outputRate~~ - *Does not work with wholestage codegen*
  - latency - Current average latency between the data being available in source and the sink writing the corresponding output
  - sourceStatuses: Array[SourceStatus] - Current statuses of the sources
  - sinkStatus: SinkStatus - Current status of the sink
  - triggerStatus - Low-level detailed status of the last completed/currently active trigger
    - latencies - getOffset, getBatch, full trigger, wal writes
    - timestamps - trigger start, finish, after getOffset, after getBatch
    - numRows - input, output, state total/updated rows for aggregations

- `SourceStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by the source
  - processingRate - Current rate (rows/sec) at which the query is processing data from the source
  - triggerStatus - Low-level detailed status of the last completed/currently active trigger

- Python API for `StreamingQuery.status()`

**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`.
  - Branch 2.0 should have it deprecated, master should have it removed.

**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus`
   - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status)

- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`.

- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`.

- For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.

Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore, etc.

Metrics also manually tested using Ganglia sink

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#15307 from tdas/SPARK-17731.
@tdas tdas changed the title [SPARK-17731][SQL][STREAMING] Metrics for structured streaming [SPARK-17731][SQL][STREAMING] Metrics for structured streaming for branch-2.0 Oct 13, 2016
@SparkQA
Copy link

SparkQA commented Oct 13, 2016

Test build #66916 has finished for PR 15472 at commit ee8e899.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -77,6 +77,9 @@ trait StateStore {
*/
def updates(): Iterator[StoreUpdate]

/** Number of keys in the state store */
def numKeys(): Long
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be defined with parentheses, shouldn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in an internal API, so doesnt really matter. I can change it in the follow up PR

@@ -84,27 +84,27 @@ object StreamingQueryListener {
* @since 2.0.0
*/
@Experimental
class QueryStarted private[sql](val queryInfo: StreamingQueryInfo) extends Event
class QueryStarted private[sql](val queryStatus: StreamingQueryStatus) extends Event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this an api breaking change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with changing this, but I'd rename this just "status", and the other ones "event" for the function arguments in StreamingQueryListener.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. I will do that in a follow up PR after this PR goes in makes branch-2.0 consistent with master.

@tdas
Copy link
Contributor Author

tdas commented Oct 17, 2016

@rxin @marmbrus I am think of merging this PR (with the ignored flaky test) to make it consistent with the master. So that follow up PRs (flaky test fix, further API changes) can be merged on both master and branch 2.0. Any objections?

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #67086 has finished for PR 15472 at commit 1a32b39.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Oct 17, 2016

^^ The test failure above seems to be unrelated.

@SparkQA
Copy link

SparkQA commented Oct 17, 2016

Test build #3357 has finished for PR 15472 at commit 1a32b39.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor Author

tdas commented Oct 17, 2016

I am merging this to branch-2.0.

Follow up PR to fix the flaky test is #15519

asfgit pushed a commit that referenced this pull request Oct 17, 2016
…anch-2.0

**This PR adds the same metrics to branch-2.0 that was added to master in #15307.**

The differences compared to the #15307 are
- The new configuration is added only in the `SQLConf `object (i.e. `SQLConf.STREAMING_METRICS_ENABLED`) and not in the `SQLConf` class (i.e. no `SQLConf.isStreamingMetricsEnabled`). Spark master has all the streaming configurations exposed as actual fields in SQLConf class (e.g. [streamingPollingDelay](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L642)), but [not in Spark 2.0](https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L608). So I didnt add it in this 2.0 PR.

- In the previous master PR, the aboveconfiguration was read in `StreamExecution` as `sparkSession.sessionState.conf.isStreamingMetricsEnabled`. In this 2.0 PR, I am instead reading it as `sparkSession.conf.get(STREAMING_METRICS_ENABLED)`(i.e. no `sessionState`) to keep it consistent with how other confs are read in `StreamExecution` (e.g. [STREAMING_POLLING_DELAY](https://github.com/tdas/spark/blob/ee8e899e4c274c363a8b4d13e8bf57b0b467a50e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L62)).

- Different Mima exclusions

------

## What changes were proposed in this pull request?

Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing

Specifically, this PR adds the following public APIs changes.

- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later)

- `StreamingQueryStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by all the sources
  - processingRate - Current rate (rows/sec) at which the query is processing data from
                                  all the sources
  - ~~outputRate~~ - *Does not work with wholestage codegen*
  - latency - Current average latency between the data being available in source and the sink writing the corresponding output
  - sourceStatuses: Array[SourceStatus] - Current statuses of the sources
  - sinkStatus: SinkStatus - Current status of the sink
  - triggerStatus - Low-level detailed status of the last completed/currently active trigger
    - latencies - getOffset, getBatch, full trigger, wal writes
    - timestamps - trigger start, finish, after getOffset, after getBatch
    - numRows - input, output, state total/updated rows for aggregations

- `SourceStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by the source
  - processingRate - Current rate (rows/sec) at which the query is processing data from the source
  - triggerStatus - Low-level detailed status of the last completed/currently active trigger

- Python API for `StreamingQuery.status()`

### Breaking changes to existing APIs

**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`.
  - Branch 2.0 should have it deprecated, master should have it removed.

**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus`
   - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status)

- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`.

- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`.

- For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.

## How was this patch tested?

Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore, etc.

Metrics also manually tested using Ganglia sink

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15472 from tdas/SPARK-17731-branch-2.0.
@tdas
Copy link
Contributor Author

tdas commented Oct 18, 2016

I have merged it to branch-2.0. Closing the PR.

@tdas tdas closed this Oct 18, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants