-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mongo window range queries #224
base: main
Are you sure you want to change the base?
Conversation
final var segmentWindows = partitionSegments.segmentWindows.get(segment); | ||
|
||
// Since we use a flat keyspace by concatenating the timestamp with the data key and have | ||
// variable length data keys, it's impossible to request only valid data that's within |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question to clarify my understanding - why does the variability of the length of the data key matter? even if we knew all the keys were the same size, this filter would still pick up too much data right? if the key is first, you'd still pick up a bunch of time windows you don't want. And if the time is first you'd still pick up a bunch of keys you don't want.
@@ -483,7 +509,35 @@ public KeyValueIterator<WindowedKey, byte[]> fetchAll( | |||
final long timeFrom, | |||
final long timeTo | |||
) { | |||
throw new UnsupportedOperationException("fetchAll not yet supported for Mongo backends"); | |||
if (!timestampFirstOrder) { | |||
throw new UnsupportedOperationException("Range queries such as fetchAll require stores to be " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is because mongo doesn't let you scan the whole table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this was a "prevent users from shooting themselves in the foot" moment -- it's possible, just really inefficient since we have to scan everything in between (see comment about variable length keys).
Of course if we're going to go the route of scanning everything in order to enable partition scaling, then this constraint doesn't make any sense. I'll remove it
|
||
final FindIterable<WindowDoc> fetchResults = segmentWindows.find( | ||
Filters.and( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should include these fields outside of the key and filter on them server-side. This has the potential to jack up our data transfer costs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand the question -- what are "these fields"?
And what do you mean by including them "outside the key" -- are you talking about the composite key, and moving the timestamp to a separate field rather than a composite? I don't see how that would affect our data transfer costs for range queries, as in all cases where we return the key we would need to return the timestamp as well.
I'm wondering if there might be a case for point lookups, though. Since AFAICT there's no way to return a doc without sending the key/ID, whereas if the timestamp was an additional field then maybe it's possible to filter that out. Is that what you're getting at?
Is it even true that there's no way to do a point lookup without returning the key/ID in the response? I'm assuming that it's possible to filter out other fields besides the key, but even that I don't know for sure 🤔
@Override | ||
public KeyValueIterator<WindowedKey, byte[]> all( | ||
final int kafkaPartition, | ||
final long streamTime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to filter on this time?
@@ -457,13 +459,37 @@ public KeyValueIterator<WindowedKey, byte[]> backFetch( | |||
@Override | |||
public KeyValueIterator<WindowedKey, byte[]> fetchRange( | |||
final int kafkaPartition, | |||
final Bytes fromKey, | |||
final Bytes toKey, | |||
final Bytes keyFrom, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question about the state store api in general - what are these cross-key APIs actually used for? All the kafka streams processors are scoped to one key afaik.
Finish implementation for the remaining range queries of MongoWindowedTable.
The two queries take a slightly different approach, as documented in code comments for each. Basically for the all-keys time range scan we can identify exact borders and issue a precise range query for only valid data. However for the key and time range scan, we have to scan over all keys and post-filter them when timestamp-first order is used (and vice versa for key-first order). This is due to the byte layout of the composite primary keys, and the fact that key bytes are variable length