-
Notifications
You must be signed in to change notification settings - Fork 542
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BlockBuilder: Refactor the consume cycle logic #8329
Conversation
331ea4b
to
712b67d
Compare
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
712b67d
to
af11016
Compare
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
@@ -28,7 +28,7 @@ import ( | |||
) | |||
|
|||
type builder interface { | |||
process(ctx context.Context, rec *kgo.Record, blockMin, blockMax int64, recordProcessedBefore bool) (_ bool, err error) | |||
process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordProcessedBefore bool) (_ bool, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the name because technically it is not a blockMin, rather the max of the last cycle
Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
pkg/blockbuilder/blockbuilder.go
Outdated
for i := range req.Topics[0].Partitions { | ||
if req.Topics[0].Partitions[i].Partition == commitRec.Partition { | ||
req.Topics[0].Partitions[i].Metadata = &meta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the commit metadata. Debugging through the kgo library I saw that this string was not being overridden. By default kgo sets it to member ID (which is what we were seeing).
See 3e5e03f for the fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have written unit tests for this. I will clean it up and push it as well.
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
@narqo this moves some logic out of
consumePartition
and into thenextConsumeCycle
. Main aim of this is to makeconsumePartition()
simpler where it just starts consuming from the partition until the given cycle end time. We also pass the block start and end times, and the last seen record as part of arguments.We now do all the heavy lifting in nextConsumeCycle, where we call
consumePartition()
in parts when it is lagging. We calculate the timestamps appropriately.