Skip to content

Commit

Permalink
BlockBuilder: Refactor the consume cycle logic
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
  • Loading branch information
codesome committed Jun 10, 2024
1 parent 68a6dd8 commit 331ea4b
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 64 deletions.
138 changes: 98 additions & 40 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,72 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time)
return nil
}

// TODO(v): rebalancing can happen between the calls to consumePartitions; if that happens, the instance may loose
// TODO(v): rebalancing can happen between the calls to consumePartition; if that happens, the instance may loose
// the ownership of some of its partitions
// TODO(v): test for this case

commitOffsets, err := kadm.NewClient(b.kafkaClient).ListCommittedOffsets(ctx, b.cfg.Kafka.Topic)
if err != nil {
return fmt.Errorf("get committed offsets: %w", err)
}
for part, lag := range parts {
err := b.consumePartitions(ctx, cycleEnd, part, lag)
// We look at the commit offset timestamp to determine how far behind we are lagging
// relative to the cycleEnd. We consume the partition in parts accordingly.
offset, ok := commitOffsets.Lookup(b.cfg.Kafka.Topic, part)
if ok {
commitTime := time.UnixMilli(offset.Timestamp)
if cycleEnd.Sub(commitTime) > 3*b.cfg.ConsumeInterval/2 {
// If the commit timestamp is older than the last block end time, we are lagging behind.
// We need to consume the partition in parts. We give a buffer and consume in parts if
// it is lagging behind by more than 1.5 times the consume interval.
cycleEndStartAt := commitTime.Truncate(b.cfg.ConsumeInterval).Add(b.cfg.ConsumeInterval + b.cfg.ConsumeIntervalBuffer)

// We iterate through all the cycleEnds starting from the first one after commitTime until the cycleEnd.
for ce := cycleEndStartAt; cycleEnd.Sub(ce) >= 0; ce = ce.Add(b.cfg.ConsumeInterval) {
// TODO: Get lastBlockEndAt and seenTs from commit metadata
lastBlockEndAt := time.Now()
seenTs := int64(0) // Kafka record timestamp till which records were processed before.
// TODO(codeosme): resume start from the last commit point. See how to do it.
// TODO(codesome): since we will resume from last commit, the lag might be wrong.
// consumePartition() should ideally return the lag point for the commit record.
// Or we can get the updated lag from kafka instead.
lag, err = b.consumePartition(ctx, part, lag, time.UnixMilli(seenTs), lastBlockEndAt, ce)
if err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part)
}
if ce.Compare(cycleEnd) != 0 && ce.Add(b.cfg.ConsumeInterval).After(cycleEnd) {
ce = cycleEnd
}

// Refresh the commit offsets.
commitOffsets, err = kadm.NewClient(b.kafkaClient).ListCommittedOffsets(ctx, b.cfg.Kafka.Topic)
if err != nil {
return fmt.Errorf("get committed offsets: %w", err)
}
offset, ok = commitOffsets.Lookup(b.cfg.Kafka.Topic, part)
if !ok {
// We expect a commit here. It is an error if there is none.
return fmt.Errorf("commit offset not found for topic %q partition %d", b.cfg.Kafka.Topic, part)
}
}

// Make sure to unblock rebalance of the group after the partition was consumed AND after we (potentially) committed
// this partition's offset to the group.
// TODO(v): test for this case
b.kafkaClient.AllowRebalance()
continue
}
}

// Either we did not find a commit offset or we are not lagging behind by
// more than 1.5 times the consume interval.
lastBlockEndAt := cycleEnd.Truncate(b.cfg.ConsumeInterval).Add(-b.cfg.ConsumeInterval)
seenTs := int64(0) // Kafka record timestamp till which records were processed before.
if ok {
// TODO(codesome): get lastBlockEndAt and seenOffset from commit metadata
}

_, err = b.consumePartition(ctx, part, lag, time.UnixMilli(seenTs), lastBlockEndAt, cycleEnd)
if err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part)
}
Expand All @@ -291,7 +352,18 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time)
return nil
}

func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time, part int32, lag int64) (err error) {
// consumePartition consumes records from the given partition until the cycleEnd timestamp.
// If the partition is lagging behind, the caller of consumePartition needs to take care of
// calling consumePartition in parts.
// consumePartition returns the updated lag after consuming the partition.
func (b *BlockBuilder) consumePartition(
ctx context.Context,
part int32,
lag int64,
seenTillTs, // Kafka record timestamp till which records were processed before.
lastBlockEnd, // blockEndAt associated with the previous commit
cycleEnd time.Time,
) (_ int64, err error) {
// TopicPartition to resume consuming on this iteration.
// Note: pause/resume is a client-local state. On restart or a crash, the client will be assigned its share of partitions,
// during consumer group's rebalancing, and it will continue consuming as usual.
Expand All @@ -306,29 +378,24 @@ func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time
}(time.Now())

builder := b.tsdbBuilder()
defer builder.close()

blockStartAt, blockEndAt := blockBounds(cycleEnd, b.cfg.ConsumeInterval)
if blockEndAt.Before(cycleEnd) {
// This should never happen.
panic(fmt.Errorf("block bounds [%s, %s] outside of cycle %s", blockStartAt, blockEndAt, cycleEnd))
}
defer builder.close() // TODO: handle error

var (
done bool

done bool
firstUncommittedRec *kgo.Record
checkpointRec *kgo.Record
lastRec *kgo.Record
lastBlockMax = lastBlockEnd.UnixMilli()
blockMax = cycleEnd.Truncate(b.cfg.ConsumeInterval).UnixMilli()
)
for !done {
// Limit time client waits for a new batch. Otherwise, the client will hang if it lands on an inactive partition.
ctx1, cancel := context.WithTimeout(ctx, 5*time.Second)
fetches := b.kafkaClient.PollFetches(ctx1)
cancel()

if fetches.IsClientClosed() {
level.Warn(b.logger).Log("msg", "client closed when fetching records")
return nil
return lag, nil
}

fetches.EachError(func(_ string, part int32, err error) {
Expand Down Expand Up @@ -358,47 +425,38 @@ func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time
break
}

// TODO(v): refactor block bounds check into a state machine:
// During catching up, if builder skipped samples, roll back the offset to first uncommited after compacting and starting next block
if rec.Timestamp.Before(blockStartAt) || rec.Timestamp.After(blockEndAt) {
// When BB is first deployed or if it is lagging behind, then it might consuming data from too much
// in the past. In which case if we try to consume all at once, it can overwhelm the system.
// So we break this into multiple block building cycles.
if rec.Timestamp.After(blockEndAt) {
if err := builder.compactAndUpload(ctx, b.blockUploaderForUser); err != nil {
return fmt.Errorf("compact tsdb builder: %w", err)
}
}
blockStartAt, blockEndAt = blockBounds(rec.Timestamp, b.cfg.ConsumeInterval)
}
lastRec = rec

level.Debug(b.logger).Log("msg", "process record", "offset", rec.Offset, "rec", rec.Timestamp, "bmin", blockStartAt, "bmax", blockEndAt)
level.Debug(b.logger).Log("msg", "process record", "offset", rec.Offset, "rec", rec.Timestamp, "bmin", lastBlockEnd, "bmax", blockEndAt)

recProcessedBefore := false // TODO: get this from kafka commit
blockMin, blockMax := blockStartAt.UnixMilli(), blockEndAt.UnixMilli()
allSamplesProcessed, err := builder.process(ctx, rec, blockMin, blockMax, recProcessedBefore)
recProcessedBefore := seenTillTs.After(rec.Timestamp)
allSamplesProcessed, err := builder.process(ctx, rec, lastBlockMax, blockMax, recProcessedBefore)
if err != nil {
level.Error(b.logger).Log("msg", "failed to process record", "part", part, "key", string(rec.Key), "err", err)
// TODO(codesome): do we just ignore this? What if it was Mimir's issue and this leading to data loss?
// TODO(codesome): add metric
} else if allSamplesProcessed {
// TODO(v): only advance checkpoint when all previously seen records from this cycle were processed
}
if !allSamplesProcessed && checkpointRec == nil {
// First record where all samples were not processed becomes the checkpoint/commit point.
checkpointRec = rec
} else {
if firstUncommittedRec == nil {
firstUncommittedRec = rec
}
}
if firstUncommittedRec == nil {
firstUncommittedRec = rec
}
}
}

if err := builder.compactAndUpload(ctx, b.blockUploaderForUser); err != nil {
// TODO(codesome): add metric
return err
return lag, err
}

if checkpointRec == nil {
// All samples in all records were processed. We can commit the last record's offset.
checkpointRec = lastRec
}

// TODO(codesome): Make sure all the blocks have been shipped before committing the offset.
return b.finalizePartition(ctx, firstUncommittedRec, checkpointRec)
return lag, b.finalizePartition(ctx, firstUncommittedRec, checkpointRec)
}

func (b *BlockBuilder) blockUploaderForUser(ctx context.Context, userID string) blockUploader {
Expand Down
37 changes: 17 additions & 20 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,7 @@ func TestBlockBuilder_BuildBlocks(t *testing.T) {
for i := int64(0); i < 10; i++ {
ts := testEpoch.Add(time.Duration(i/5) * time.Hour)
val := createWriteRequest(t, floatSample(ts.UnixMilli()), nil)
rec := &kgo.Record{
Timestamp: ts,
Key: []byte(userID),
Value: val,
Topic: testTopic,
Partition: int32(i % numPartitions), // samples in this batch are split between N partitions
}
produceResult := writeClient.ProduceSync(ctx, rec)
require.NoError(t, produceResult.FirstErr())
produceRecord(ctx, t, writeClient, ts, testTopic, int32(i%numPartitions), userID, val)
}

cfg := Config{
Expand Down Expand Up @@ -117,17 +109,9 @@ func TestBlockBuilder_BuildBlocks(t *testing.T) {
for i := 0; i < 10; i++ {
ts := testEpoch.Add(3 * time.Hour)
val := createWriteRequest(t, floatSample(ts.UnixMilli()), nil)
rec := &kgo.Record{
Timestamp: ts,
Key: []byte(userID),
Value: val,
Topic: testTopic,
Partition: 1, // these samples are only for first partition
}
produceResult := writeClient.ProduceSync(ctx, rec)
require.NoError(t, produceResult.FirstErr())

lastProducedOffest = produceResult[0].Record.Offset
// these samples are only for first partition
pr := produceRecord(ctx, t, writeClient, ts, testTopic, 1, userID, val)
lastProducedOffest = pr[0].Record.Offset
}

cycleEnd := testEpoch.Add(4 * time.Hour)
Expand All @@ -148,6 +132,19 @@ func TestBlockBuilder_BuildBlocks(t *testing.T) {
require.Equal(t, lastProducedOffest+1, offset.Offset) // +1 because lastProducedOffset points at already consumed record
}

func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, ts time.Time, topicName string, partitionID int32, userID string, content []byte) kgo.ProduceResults {
rec := &kgo.Record{
Timestamp: ts,
Key: []byte(userID),
Value: content,
Topic: topicName,
Partition: partitionID,
}
produceResult := writeClient.ProduceSync(ctx, rec)
require.NoError(t, produceResult.FirstErr())
return produceResult
}

func newKafkaProduceClient(t *testing.T, addrs ...string) *kgo.Client {
writeClient, err := kgo.NewClient(
kgo.SeedBrokers(addrs...),
Expand Down
8 changes: 4 additions & 4 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

type builder interface {
process(ctx context.Context, rec *kgo.Record, blockMin, blockMax int64, recordProcessedBefore bool) (_ bool, err error)
process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordProcessedBefore bool) (_ bool, err error)
compactAndUpload(ctx context.Context, blockUploaderForUser func(context.Context, string) blockUploader) error
close() error
}
Expand Down Expand Up @@ -65,7 +65,7 @@ func newTSDBBuilder(logger log.Logger, limits *validation.Overrides, blocksStora
// where the sample was not put in the TSDB because it was discarded or was already processed before.
// lastEnd: "end" time of the previous block building cycle.
// currEnd: end time of the block we are looking at right now.
func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, blockMin, blockMax int64, recordProcessedBefore bool) (_ bool, err error) {
func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordProcessedBefore bool) (_ bool, err error) {
userID := string(rec.Key)

req := mimirpb.WriteRequest{}
Expand Down Expand Up @@ -119,7 +119,7 @@ func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, blockMin, bl
allSamplesProcessed = false
continue
}
if recordProcessedBefore && s.TimestampMs < blockMin {
if recordProcessedBefore && s.TimestampMs < lastBlockMax {
// This sample was already processed in the previous cycle.
continue
}
Expand Down Expand Up @@ -147,7 +147,7 @@ func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, blockMin, bl
allSamplesProcessed = false
continue
}
if recordProcessedBefore && h.Timestamp < blockMin {
if recordProcessedBefore && h.Timestamp < lastBlockMax {
// This sample was already processed in the previous cycle.
continue
}
Expand Down

0 comments on commit 331ea4b

Please sign in to comment.