Skip to content

Commit

Permalink
blockbuilder: fix metadata commit (#8354)
Browse files Browse the repository at this point in the history
Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
  • Loading branch information
narqo authored Jun 12, 2024
1 parent 204454c commit 140e7eb
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/plugin/kprom"

"github.com/grafana/mimir/pkg/storage/bucket"
Expand Down Expand Up @@ -212,7 +213,10 @@ func (b *BlockBuilder) running(ctx context.Context) error {
for {
select {
case cycleEnd := <-time.After(waitTime):
_ = b.nextConsumeCycle(ctx, cycleEnd.Add(-time.Second))
err := b.nextConsumeCycle(ctx, cycleEnd.Add(-time.Second))
if err != nil {
level.Error(b.logger).Log("msg", "consume cycle failed", "cycle_end", cycleEnd, "err", err)
}

// If we took more than consumptionItvl time to consume the records, this
// will immediately start the next consumption.
Expand Down Expand Up @@ -278,7 +282,7 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time)
// the ownership of some of its partitions
// TODO(v): test for this case

offsets, err := kadm.NewClient(b.kafkaClient).FetchOffsetsForTopics(ctx, b.cfg.Kafka.Topic)
offsets, err := kadm.NewClient(b.kafkaClient).FetchOffsetsForTopics(ctx, b.cfg.Kafka.ConsumerGroup, b.cfg.Kafka.Topic)
if err != nil {
return fmt.Errorf("fetch offsets for topics: %w", err)
}
Expand All @@ -289,11 +293,14 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time)
var commitRecTs, seenTillTs, lastBlockEnd int64
var commitRecTime time.Time
if ok {
level.Debug(b.logger).Log("part", offset.Partition, "offset", offset.At, "meta", offset.Metadata)
commitRecTs, seenTillTs, lastBlockEnd, err = unmarshallCommitMeta(offset.Metadata)
if err != nil {
return err
}
commitRecTime = time.UnixMilli(commitRecTs)
} else {
level.Warn(b.logger).Log("msg", "didn't find partition", "part", part, "offsets", fmt.Sprintf("%+v", offsets))
}

lagging := ok && cycleEnd.Sub(commitRecTime) > 3*b.cfg.ConsumeInterval/2
Expand Down Expand Up @@ -420,6 +427,7 @@ func (b *BlockBuilder) consumePartition(
allSamplesProcessed, err := builder.process(ctx, rec, lastBlockMax, blockMax, recProcessedBefore)
if err != nil {
level.Error(b.logger).Log("msg", "failed to process record", "part", part, "key", string(rec.Key), "err", err)
continue
// TODO(codesome): do we just ignore this? What if it was Mimir's issue and this leading to data loss?
// TODO(codesome): add metric
}
Expand Down Expand Up @@ -490,23 +498,27 @@ func (b *BlockBuilder) finalizePartition(ctx context.Context, commitRec, lastRec
},
})

var off kadm.Offsets
off.Add(kadm.Offset{
Topic: commitRec.Topic,
Partition: commitRec.Partition,
At: commitRec.Offset,
LeaderEpoch: commitRec.LeaderEpoch,
Metadata: marshallCommitMeta(commitRec.Timestamp.UnixMilli(), lastRec.Timestamp.UnixMilli(), blockEnd),
ctx = kgo.PreCommitFnContext(ctx, func(req *kmsg.OffsetCommitRequest) error {
meta := marshallCommitMeta(commitRec.Timestamp.UnixMilli(), lastRec.Timestamp.UnixMilli(), blockEnd)
for _, topic := range req.Topics {
if topic.Topic != b.cfg.Kafka.Topic {
continue
}
for _, part := range req.Topics[0].Partitions {
if part.Partition == commitRec.Partition {
part.Metadata = &meta
}
}
}
level.Info(b.logger).Log("commit request", fmt.Sprintf("%+v", req))
return nil
})
committed, err := kadm.NewClient(b.kafkaClient).CommitOffsets(ctx, b.cfg.Kafka.ConsumerGroup, off)
err := b.kafkaClient.CommitRecords(ctx, commitRec)
if err != nil {
return err
} else if !committed.Ok() {
return committed.Error()
return fmt.Errorf("commit record with part %d, offset %d: %w", commitRec.Partition, commitRec.Offset, err)
}

committedOffset, _ := committed.Lookup(b.cfg.Kafka.Topic, commitRec.Partition)
level.Debug(b.logger).Log("msg", "last commit offset successfully committed to Kafka", "offset", committedOffset.At)
level.Debug(b.logger).Log("msg", "successfully committed to Kafka", "part", commitRec.Partition, "offset", commitRec.Offset)

return nil
}
Expand Down

0 comments on commit 140e7eb

Please sign in to comment.