-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Adds ability to bypass cache for pull queries #6891
Conversation
@confluentinc It looks like @cprasad1 just signed our Contributor License Agreement. 👍 Always at your service, clabot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add additional tests to WindowStoreCacheBypassTest
and SessionStoreCacheBypassTest
?
.../java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedSessionTable.java
Outdated
Show resolved
Hide resolved
...n/java/io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTable.java
Show resolved
Hide resolved
...main/java/io/confluent/ksql/execution/streams/materialization/ks/WindowStoreCacheBypass.java
Show resolved
Hide resolved
Objects.requireNonNull(keyTo, "upper key can't be null"); | ||
final List<ReadOnlyWindowStore<GenericKey, ValueAndTimestamp<GenericRow>>> stores | ||
= getStores(store); | ||
for (final ReadOnlyWindowStore<GenericKey, ValueAndTimestamp<GenericRow>> windowStore :stores) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can probably put this for loop in a method that takes a lambda:
findFirstNonEmptyIterator(... stores, Function<ReadOnlyWindowStore<GenericKey, ValueAndTimestamp<GenericRow>>, KeyValueIterator<Windowed<GenericKey>> func) {
for (...) {
}
}
You might have to make a generic parameter of the method if you want to handle the WindowStoreIterator as well, but it could simplify the code a bit more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tried to implement that, but it required a bit of type casting and I am nervous about that
...ain/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java
Outdated
Show resolved
Hide resolved
...ain/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java
Outdated
Show resolved
Hide resolved
...ain/java/io/confluent/ksql/execution/streams/materialization/ks/SessionStoreCacheBypass.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @cprasad1 !
Overall, this PR LGTM. Once you've addressed @AlanConfluent 's feedback, I'm ok to merge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, LGTM
Description
Adds ability to bypass cache for pull queries
Testing done
Updated existing unit tests
Reviewer checklist