-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: pull query bandwidth based throttling #7738
Conversation
ksqldb-rest-app/src/main/java/io/confluent/ksql/api/impl/QueryEndpoint.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/api/server/SlidingWindowRateLimiter.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Outdated
Show resolved
Hide resolved
...p/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java
Outdated
Show resolved
Hide resolved
...-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/PullQueryPublisher.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/test/java/io/confluent/ksql/api/server/SlidingWindowRateLimiterTest.java
Outdated
Show resolved
Hide resolved
...pp/src/test/java/io/confluent/ksql/api/integration/PullBandwidthThrottleIntegrationTest.java
Show resolved
Hide resolved
ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @cprasad1 sorry I'm late on this. I made a quick pass on the non-testing part and just left some quick comments. Please feel free to ignore as the PR is already merged anyways.
checkArgument(responseSizeInBytes >= 0, | ||
"Response size can't be negative."); | ||
|
||
responseSizesLog.add(new Pair<>(timestamp, responseSizeInBytes)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wearing my paranoid window processing interviewer hat here :) Just in case we may have a time shift from Time.SYSTEM.milliseconds()
, maybe do a quick sorted insert from the head than blinding adding at the head? In practice most of it would just end at the head so perf-wise it should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the purpose of the RateLimiter was to bound the bandwidth usage approximately over the span of an hour (on that time-scale, I don't see a significant time shift happening), I think the current algorithm will work just fine. Do you think it is worth making this optimization?
@@ -218,6 +224,7 @@ private QueryPublisher createPullQueryPublisher( | |||
metrics.recordRowsProcessed( | |||
Optional.ofNullable(r).map(PullQueryResult::getTotalRowsProcessed).orElse(0L), | |||
sourceType, planType, routingNodeType); | |||
pullBandRateLimiter.add(responseBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to call system time
to record metrics above, maybe we can just get one system time once to save the number of currentSystimeMillis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto for other add
calls where we would potentially get systemTime
multiple times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping the logic of getting the systemTime
internal to the pullBandRateLimiter
seemed like a cleaner design. That way the common method interface is simpler and someone cannot pass in a bad value. It does come at the expense of potentially multiple redundant function calls.
It is also an attempt at decoupling the metrics reporting logic and bandwidth limiting logic. We are already piggy-backing on the metrics reporting framework for limiting the bandwidth, and I did not want further coupling between the two.
Description
Limits the amount of bandwidth used by Pull Queries using a Sliding Window Log. The limit (in MB per hour) can be set using the
ksql.query.pull.max.hourly.bandwidth
config. Once the limit is hit, pull queries will fail immediately.Testing done
Reviewer checklist