-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming #15307
Conversation
Test build #66149 has finished for PR 15307 at commit
|
Test build #66166 has finished for PR 15307 at commit
|
Test build #66167 has finished for PR 15307 at commit
|
Test build #66183 has finished for PR 15307 at commit
|
Test build #66184 has finished for PR 15307 at commit
|
|
||
/* Get the call site in the caller thread; will pass this into the micro batch thread */ | ||
private val callSite = Utils.getCallSite() | ||
|
||
private val streamMetrics = new StreamMetrics(uniqueSources.toSet, triggerClock, | ||
"%s.StructuredStreamingMetrics.%s".format(sparkSession.sparkContext.appName, name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s"${sparkSession.sparkContext.appName}.StructuredStreamingMetrics.$name".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing the format. Tested on Ganglia and decided against putting appname inside since we already have the query name.
|
||
def reportNumRows(inputRows: Map[Source, Long], outputRows: Option[Long]): Unit = synchronized { | ||
numInputRows ++= inputRows | ||
numOutputRows = outputRows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
numOutputRows += outputRows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no .. its numOutputRows in the current trigger.
@@ -511,12 +555,59 @@ class StreamExecution( | |||
""".stripMargin | |||
} | |||
|
|||
private def reportMetrics(executedPlan: SparkPlan): Unit = { | |||
val execPlanLeaves = executedPlan.collect { case p if p.children.isEmpty => p } | |||
val sourceToNumInputRows = if (execPlanLeaves.size == sources.size) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible thatexecPlanLeaves
contains sources of batch DF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is true. its possible.
good point. this needs to be more complex then.
Test build #66192 has finished for PR 15307 at commit
|
Test build #66262 has finished for PR 15307 at commit
|
} else { | ||
streamMetrics.reportTriggerInfo(STATUS_MESSAGE, "No new data") | ||
streamMetrics.reportTriggerInfo(DATA_AVAILABLE, false) | ||
Thread.sleep(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this sleep amount hardcoded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well it was hardcoded before. I can make it a SQLConf.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I'm misreading, it was using pollingDelayMs before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it. I goofed up while merging with master.
(s, s.getOffset) | ||
} | ||
}.toMap | ||
availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty }.mapValues(_.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused as to why this was changed to a separate filter step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just converted flatMap over Options
to filter out Nones and then get
. It seemed marginally more easy to understand than the flatmap, but I am willing to change it back if this is more confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanted to make sure I wasn't missing something. There's later code that uses flatMap instead of a separate filter, maybe better to be consistent one way or the other but not a big deal.
// =========== Initialization =========== | ||
|
||
// Metric names should not have . in them, so that all the metrics of a query are identified | ||
// together in Ganglia as a single metric group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by this comment. Shouldn't metrics be agnostic as to the type of sink, not just ganglia? Are hyphens valid in identifier names for all currently used sinks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metrics are generally agnostic to sinks, but different sinks probably have different ways of representing data visually. I tested with Ganglia (assuming that its the most common one in production), found that it clusters based on the last period. So multiple metrics names A.B.X, A.B.Y, A.B.Z will be put in a cluster A.B .
Regarding validity, we using Codahales Metrics.name(partialName1, partialName2, ...)
underneath which is supposed to check formats and all that stuff. So I am assuming that whatever passes through that check will be valid for all sinks. And hyphen seems to be fine with Ganglia sink.
def currentInputRate(): Double = synchronized { | ||
// Since we are calculating source input rates using the same time interval for all sources | ||
// it is fine to calculate total input rate as the sum of per source input rate. | ||
inputRates.map(_._2.currentRate).sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason not to just make RateCalculator monoidal so it sums correctly, even if you later changed to different time intervals per source?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you elaborate further with an example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If RateCalculator has a zero and an addition method, then you can do
inputRates.map(_._2).foldLeft(RateCalculator.zero)(_+_)
and not have to rely on the comment about using currentRate being safe because time intervals are the same.
If you don't want to mess with greatest common divisor or whatever until you have an actual different interval, you can just make the addition method throw unless the intervals in the two rate objects are the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using the same time interval for all sources
Actually, I noticed that this is not true right now. There is a race condition since source infos are updated separately. Then it's possible that when this method is called, some sources have the new info while the others still have the info for last batch. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about updating all sources together in one method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, resolved. Synchronization has been simplified by the stream execution thread updating a volatile reference to the query status, every time any of components used in the status is updated. StreamingQuery.status only read the current value of the volatile reference. See updateStatus()
in the PR.
2 general questions
|
|
@tdas I wasn't asking if this patch was intended to implement rate limiting, I was asking if it was intended to be used to support the implementation of rate limiting in the future. In other words, when you do rate limiting, are you going to use these metrics, or collect / expose what is essentially the same data in a different way? |
I believe we would be using some of these metrics internally, and expose additional metrics through the same StreamingQueryInfo interface. But we have not put much thought on the rate limiting, so I cant give a more concrete comment than "yes, we would use as much of the existing stuff in the future". |
Test build #66269 has finished for PR 15307 at commit
|
@@ -56,7 +57,12 @@ case class StateStoreRestoreExec( | |||
child: SparkPlan) | |||
extends execution.UnaryExecNode with StatefulOperator { | |||
|
|||
override lazy val metrics = Map( | |||
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The metric names should probably be in a separate, centralized list of constants. Users will want a single place in the API docs to find a list of all available metrics, and the list is likely to change quite frequently as Structured Streaming evolves.
Edit: On second thought, looks like these string literals are for internal use, and the user-visible constants are at the bottom of StreamMetrics.scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, looks like a decent implementation of metrics for the current single-threaded, non-pipelined scheduler operating over relatively large batches. Some aspects of the current implementation will need to change down the line, but it doesn't make sense to over-design at this point in development.
/** Returns current status of all the sources. */ | ||
override def sourceStatuses: Array[SourceStatus] = { | ||
val localAvailableOffsets = availableOffsets | ||
sources.map(s => | ||
new SourceStatus(s.toString, localAvailableOffsets.get(s).map(_.toString))).toArray | ||
new SourceStatus( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this method is intended to be called from threads other than the scheduler thread, then the entire map really ought to be synchronized on streamMetrics
's lock. Otherwise this method could return a mixture of statistics from different points of time, even within a single source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah. you are probably right. Probably have to add synchronized to a lot of methods. :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, you can probably drop most of the synchronization if you keep two StreamMetrics
objects and preallocate the slots for counters. At least the way things are now, each counter in StreamMetrics
is written once per batch. If you tweak sourceStatuses()
to return the metrics from the most recent completed batch (i.e. the StreamMetrics
object that's not currently being written to), there should be no overlap between readers and writers. Eventually you'll want to have more than one StreamMetrics
object anyway, since the scheduler will need to pipeline multiple batches to reach latencies below the 50-100ms level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats not sufficient. We also have to lock changes to the committedOffsets
and availableOffsets
which generating status.
@@ -317,15 +358,18 @@ class StreamExecution( | |||
// TODO: Move this to IncrementalExecution. | |||
|
|||
// Request unprocessed data from all sources. | |||
val newData = availableOffsets.flatMap { | |||
case (source, available) | |||
val newData = timeIt(GET_BATCH_LATENCY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that the time interval being measured here will have different semantics for different sources, depending on how much computation occurs inside the source's getBatch
method vs. lazily when the data is read from the resulting Dataframe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. The intention in GET_BATCH_LATENCY is to measure the time taken in the non-lazy part.
Test build #3335 has finished for PR 15307 at commit
|
Test build #66854 has finished for PR 15307 at commit
|
Test build #66858 has finished for PR 15307 at commit
|
Test build #66859 has finished for PR 15307 at commit
|
@marmbrus |
Test build #66878 has finished for PR 15307 at commit
|
Test build #66877 has finished for PR 15307 at commit
|
Test build #66879 has finished for PR 15307 at commit
|
This LGTM as a first cut. Thanks for working on it. |
@marmbrus @brkyvz @zsxwing @koeninger Thank you very much for your feedback. I am going to merge this to master. I will open a different PR to merge this to branch 2.0 |
Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics. https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing Specifically, this PR adds the following public APIs changes. - `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later) - `StreamingQueryStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by all the sources - processingRate - Current rate (rows/sec) at which the query is processing data from all the sources - ~~outputRate~~ - *Does not work with wholestage codegen* - latency - Current average latency between the data being available in source and the sink writing the corresponding output - sourceStatuses: Array[SourceStatus] - Current statuses of the sources - sinkStatus: SinkStatus - Current status of the sink - triggerStatus - Low-level detailed status of the last completed/currently active trigger - latencies - getOffset, getBatch, full trigger, wal writes - timestamps - trigger start, finish, after getOffset, after getBatch - numRows - input, output, state total/updated rows for aggregations - `SourceStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by the source - processingRate - Current rate (rows/sec) at which the query is processing data from the source - triggerStatus - Low-level detailed status of the last completed/currently active trigger - Python API for `StreamingQuery.status()` **Existing direct public facing APIs** - Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`. - Branch 2.0 should have it deprecated, master should have it removed. **Existing advanced listener APIs** - `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus` - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status) - Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`. - Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`. - For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java. Old and new unit tests. - Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite. - New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite. - New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite. - Source-specific tests for making sure input rows are counted are is source-specific test suites. - Additional tests to test minor additions in LocalTableScanExec, StateStore, etc. Metrics also manually tested using Ganglia sink Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#15307 from tdas/SPARK-17731.
This PR seems to cause intermittent test failures eg: https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/1736/testReport/junit/org.apache.spark.sql.streaming/StreamingQueryListenerSuite/single_listener__check_trigger_statuses/ |
I am disabling the test, its failing too many times. Funny it never failed in the 10 or so times i ran in jenkins in this PR. |
## What changes were proposed in this pull request? Ignoring the flaky test introduced in #15307 https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/1736/testReport/junit/org.apache.spark.sql.streaming/StreamingQueryListenerSuite/single_listener__check_trigger_statuses/ Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15491 from tdas/metrics-flaky-test.
…anch-2.0 **This PR adds the same metrics to branch-2.0 that was added to master in #15307.** The differences compared to the #15307 are - The new configuration is added only in the `SQLConf `object (i.e. `SQLConf.STREAMING_METRICS_ENABLED`) and not in the `SQLConf` class (i.e. no `SQLConf.isStreamingMetricsEnabled`). Spark master has all the streaming configurations exposed as actual fields in SQLConf class (e.g. [streamingPollingDelay](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L642)), but [not in Spark 2.0](https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L608). So I didnt add it in this 2.0 PR. - In the previous master PR, the aboveconfiguration was read in `StreamExecution` as `sparkSession.sessionState.conf.isStreamingMetricsEnabled`. In this 2.0 PR, I am instead reading it as `sparkSession.conf.get(STREAMING_METRICS_ENABLED)`(i.e. no `sessionState`) to keep it consistent with how other confs are read in `StreamExecution` (e.g. [STREAMING_POLLING_DELAY](https://github.com/tdas/spark/blob/ee8e899e4c274c363a8b4d13e8bf57b0b467a50e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L62)). - Different Mima exclusions ------ ## What changes were proposed in this pull request? Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics. https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing Specifically, this PR adds the following public APIs changes. - `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later) - `StreamingQueryStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by all the sources - processingRate - Current rate (rows/sec) at which the query is processing data from all the sources - ~~outputRate~~ - *Does not work with wholestage codegen* - latency - Current average latency between the data being available in source and the sink writing the corresponding output - sourceStatuses: Array[SourceStatus] - Current statuses of the sources - sinkStatus: SinkStatus - Current status of the sink - triggerStatus - Low-level detailed status of the last completed/currently active trigger - latencies - getOffset, getBatch, full trigger, wal writes - timestamps - trigger start, finish, after getOffset, after getBatch - numRows - input, output, state total/updated rows for aggregations - `SourceStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by the source - processingRate - Current rate (rows/sec) at which the query is processing data from the source - triggerStatus - Low-level detailed status of the last completed/currently active trigger - Python API for `StreamingQuery.status()` ### Breaking changes to existing APIs **Existing direct public facing APIs** - Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`. - Branch 2.0 should have it deprecated, master should have it removed. **Existing advanced listener APIs** - `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus` - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status) - Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`. - Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`. - For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java. ## How was this patch tested? Old and new unit tests. - Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite. - New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite. - New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite. - Source-specific tests for making sure input rows are counted are is source-specific test suites. - Additional tests to test minor additions in LocalTableScanExec, StateStore, etc. Metrics also manually tested using Ganglia sink Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #15472 from tdas/SPARK-17731-branch-2.0.
## What changes were proposed in this pull request? Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics. https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing Specifically, this PR adds the following public APIs changes. ### New APIs - `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later) - `StreamingQueryStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by all the sources - processingRate - Current rate (rows/sec) at which the query is processing data from all the sources - ~~outputRate~~ - *Does not work with wholestage codegen* - latency - Current average latency between the data being available in source and the sink writing the corresponding output - sourceStatuses: Array[SourceStatus] - Current statuses of the sources - sinkStatus: SinkStatus - Current status of the sink - triggerStatus - Low-level detailed status of the last completed/currently active trigger - latencies - getOffset, getBatch, full trigger, wal writes - timestamps - trigger start, finish, after getOffset, after getBatch - numRows - input, output, state total/updated rows for aggregations - `SourceStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by the source - processingRate - Current rate (rows/sec) at which the query is processing data from the source - triggerStatus - Low-level detailed status of the last completed/currently active trigger - Python API for `StreamingQuery.status()` ### Breaking changes to existing APIs **Existing direct public facing APIs** - Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`. - Branch 2.0 should have it deprecated, master should have it removed. **Existing advanced listener APIs** - `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus` - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status) - Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`. - Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`. - For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java. ## How was this patch tested? Old and new unit tests. - Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite. - New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite. - New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite. - Source-specific tests for making sure input rows are counted are is source-specific test suites. - Additional tests to test minor additions in LocalTableScanExec, StateStore, etc. Metrics also manually tested using Ganglia sink Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#15307 from tdas/SPARK-17731.
## What changes were proposed in this pull request? Ignoring the flaky test introduced in apache#15307 https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/1736/testReport/junit/org.apache.spark.sql.streaming/StreamingQueryListenerSuite/single_listener__check_trigger_statuses/ Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#15491 from tdas/metrics-flaky-test.
## What changes were proposed in this pull request? Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics. https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing Specifically, this PR adds the following public APIs changes. ### New APIs - `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from `StreamingQueryInfo`, see later) - `StreamingQueryStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by all the sources - processingRate - Current rate (rows/sec) at which the query is processing data from all the sources - ~~outputRate~~ - *Does not work with wholestage codegen* - latency - Current average latency between the data being available in source and the sink writing the corresponding output - sourceStatuses: Array[SourceStatus] - Current statuses of the sources - sinkStatus: SinkStatus - Current status of the sink - triggerStatus - Low-level detailed status of the last completed/currently active trigger - latencies - getOffset, getBatch, full trigger, wal writes - timestamps - trigger start, finish, after getOffset, after getBatch - numRows - input, output, state total/updated rows for aggregations - `SourceStatus` has the following important fields - inputRate - Current rate (rows/sec) at which data is being generated by the source - processingRate - Current rate (rows/sec) at which the query is processing data from the source - triggerStatus - Low-level detailed status of the last completed/currently active trigger - Python API for `StreamingQuery.status()` ### Breaking changes to existing APIs **Existing direct public facing APIs** - Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and `StreamingQuery.sinkStatus` in favour of `StreamingQuery.status.sourceStatuses/sinkStatus`. - Branch 2.0 should have it deprecated, master should have it removed. **Existing advanced listener APIs** - `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with `SourceStatus`, `SinkStatus` - Earlier StreamingQueryInfo was used only in the advanced listener API, but now it is used in direct public-facing API (StreamingQuery.status) - Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, `QueryTerminated` changed have name `queryStatus` and return type `StreamingQueryStatus`. - Field `offsetDesc` in `SourceStatus` was Option[String], converted it to `String`. - For `SourceStatus` and `SinkStatus` made constructor private instead of private[sql] to make them more java-safe. Instead added `private[sql] object SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java. ## How was this patch tested? Old and new unit tests. - Rate calculation and other internal logic of StreamMetrics tested by StreamMetricsSuite. - New info in statuses returned through StreamingQueryListener is tested in StreamingQueryListenerSuite. - New and old info returned through StreamingQuery.status is tested in StreamingQuerySuite. - Source-specific tests for making sure input rows are counted are is source-specific test suites. - Additional tests to test minor additions in LocalTableScanExec, StateStore, etc. Metrics also manually tested using Ganglia sink Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#15307 from tdas/SPARK-17731.
## What changes were proposed in this pull request? Ignoring the flaky test introduced in apache#15307 https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/1736/testReport/junit/org.apache.spark.sql.streaming/StreamingQueryListenerSuite/single_listener__check_trigger_statuses/ Author: Tathagata Das <tathagata.das1565@gmail.com> Closes apache#15491 from tdas/metrics-flaky-test.
What changes were proposed in this pull request?
Metrics are needed for monitoring structured streaming apps. Here is the design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing
Specifically, this PR adds the following public APIs changes.
New APIs
StreamingQuery.status
returns aStreamingQueryStatus
object (renamed fromStreamingQueryInfo
, see later)StreamingQueryStatus
has the following important fieldsall the sources
outputRate- Does not work with wholestage codegenSourceStatus
has the following important fieldsStreamingQuery.status()
Breaking changes to existing APIs
Existing direct public facing APIs
StreamingQuery.sourceStatuses
andStreamingQuery.sinkStatus
in favour ofStreamingQuery.status.sourceStatuses/sinkStatus
.Existing advanced listener APIs
StreamingQueryInfo
renamed toStreamingQueryStatus
for consistency withSourceStatus
,SinkStatus
queryInfo
in listener eventsQueryStarted
,QueryProgress
,QueryTerminated
changed have namequeryStatus
and return typeStreamingQueryStatus
.offsetDesc
inSourceStatus
was Option[String], converted it toString
.SourceStatus
andSinkStatus
made constructor private instead of private[sql] to make them more java-safe. Instead addedprivate[sql] object SourceStatus/SinkStatus.apply()
which are harder to accidentally use in Java.How was this patch tested?
Old and new unit tests.
Metrics also manually tested using Ganglia sink