-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka add counters v1 uw2 #33503
base: master
Are you sure you want to change the base?
Kafka add counters v1 uw2 #33503
Conversation
5b28665
to
5f28d22
Compare
Run Java PreCommit |
Run Java_IOs_Direct PreCommit |
Run Java_Pulsar_IO_Direct PreCommit |
Run Python_Coverage PreCommit |
Run Java PreCommit |
2 similar comments
Run Java PreCommit |
Run Java PreCommit |
tests failing due to #28957 |
Run Java PreCommit |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
assign set of reviewers @johnjcasey @sjvanrossum flakey tests - want to get the review started in the meanwhile, but will wait on the tests to pass before I merge. |
Assigning reviewers. If you would like to opt out of this review, comment R: @kennknowles for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Run Java PreCommit |
2 similar comments
Run Java PreCommit |
Run Java PreCommit |
Would you mind adding some more context on what this change is doing?
2 questions:
|
Run Java PreCommit |
Run GoPortable PreCommit |
Run Go PreCommit |
Run PythonDocker PreCommit 3.12 |
Run Java PreCommit |
Reopening this, This consistently fails though due to a known flakey test: #28957 |
1861d3b
to
83718b3
Compare
Run Java PreCommit |
Run Prism_Python PreCommit 3.12 |
Run Java_GCP_IO_Direct PreCommit |
|
||
// Based on namespace, add per worker metrics label to enable separate runner based sink based | ||
// processing. | ||
if (metricName.getNamespace().equals("BigQuerySink") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like this woudl be cleaner if it was at the call-site when creating the metric.
That would both make it more obvious there and less based upon magic constant and would allow us to support some kafka metrics per-worker but others aggregated or add per-worker metrics to some namespace that already has metrics.
It seems like maybe we could either add it to MetricKey, or we could plumb it into the cell and then update the function where we generate metadata to base it upon that instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Removed BQ sink support, that can be done later. I've added it to MetricName
@@ -332,7 +332,6 @@ public long getSplitBacklogBytes() { | |||
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) { | |||
return UnboundedReader.BACKLOG_UNKNOWN; | |||
} | |||
backlogBytes += pBacklog; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching!
@Override | ||
public void recordBacklogBytes(String topicName, int partitionId, long backlogBytes) { | ||
Gauge perPartion = | ||
Metrics.gauge( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ie can we create the gauge here with the getPerWorkerGauge method? and then ensure that those metrics have the per-worker metadata populated appropriately?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We create it as a regular gauge, and then add per worker metadata to differenciate between the two (with the metric labels). If we add a new api here, it would then be added to any other derived metrics, (and then we'd likely want per worker other sorts of metrics.) Keeping the original set (gauge, hist, counter, etc) and then adding metadata for runner aggregation might be a cleaner approach?
1d17345
to
168d1aa
Compare
Run Java PreCommit |
Run Java_IOs_Direct PreCommit |
1 similar comment
Run Java_IOs_Direct PreCommit |
168d1aa
to
c107437
Compare
Run PythonDocker PreCommit 3.10 |
Run Prism_Python PreCommit 3.12 |
Run Java PreCommit |
Run Java_GCP_IO_Direct PreCommit |
Run Java PreCommit |
Add per worker metadata on metrics propaged over FnApi. This is on top of #33408
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.