Skip to content

Commit

Permalink
BlockBuilder: Refactor the consume cycle logic and add metadata to th…
Browse files Browse the repository at this point in the history
…e kafka commit (#8329)

* BlockBuilder: Refactor the consume cycle logic

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix commit records and add metadata to the commit

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Use the information from the commit metadata

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* blockbuilder: fix metadata commit (#8354)

Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>

* Fix comitting with metadata

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Further fix to comitting with metadata

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Add test for metadata in commit

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

---------

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Signed-off-by: Vladimir Varankin <vladimir.varankin@grafana.com>
Co-authored-by: Vladimir Varankin <vladimir.varankin@grafana.com>
  • Loading branch information
codesome and narqo authored Jun 12, 2024
1 parent 68a6dd8 commit 665b747
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 98 deletions.
244 changes: 169 additions & 75 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/plugin/kprom"

"github.com/grafana/mimir/pkg/storage/bucket"
Expand Down Expand Up @@ -59,7 +60,7 @@ func New(
return newTSDBBuilder(b.logger, b.limits, b.cfg.BlocksStorageConfig)
}

bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", logger, reg)
bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "blockbuilder", logger, reg)
if err != nil {
return nil, fmt.Errorf("failed to create the bucket client: %w", err)
}
Expand Down Expand Up @@ -212,7 +213,10 @@ func (b *BlockBuilder) running(ctx context.Context) error {
for {
select {
case cycleEnd := <-time.After(waitTime):
_ = b.nextConsumeCycle(ctx, cycleEnd.Add(-time.Second))
err := b.nextConsumeCycle(ctx, cycleEnd.Add(-time.Second))
if err != nil {
level.Error(b.logger).Log("msg", "consume cycle failed", "cycle_end", cycleEnd, "err", err)
}

// If we took more than consumptionItvl time to consume the records, this
// will immediately start the next consumption.
Expand Down Expand Up @@ -274,14 +278,68 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time)
return nil
}

// TODO(v): rebalancing can happen between the calls to consumePartitions; if that happens, the instance may loose
// TODO(v): rebalancing can happen between the calls to consumePartition; if that happens, the instance may loose
// the ownership of some of its partitions
// TODO(v): test for this case

offsets, err := kadm.NewClient(b.kafkaClient).FetchOffsetsForTopics(ctx, b.cfg.Kafka.ConsumerGroup, b.cfg.Kafka.Topic)
if err != nil {
return fmt.Errorf("fetch offsets for topics: %w", err)
}
for part, lag := range parts {
err := b.consumePartitions(ctx, cycleEnd, part, lag)
if err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part)
// We look at the commit offset timestamp to determine how far behind we are lagging
// relative to the cycleEnd. We consume the partition in parts accordingly.
offset, ok := offsets.Lookup(b.cfg.Kafka.Topic, part)
var commitRecTs, seenTillTs, lastBlockEnd int64
var commitRecTime time.Time
if ok {
level.Debug(b.logger).Log("part", offset.Partition, "offset", offset.At, "meta", offset.Metadata)
commitRecTs, seenTillTs, lastBlockEnd, err = unmarshallCommitMeta(offset.Metadata)
if err != nil {
return err
}
commitRecTime = time.UnixMilli(commitRecTs)
} else {
level.Warn(b.logger).Log("msg", "didn't find partition", "part", part, "offsets", fmt.Sprintf("%+v", offsets))
}

lagging := ok && cycleEnd.Sub(commitRecTime) > 3*b.cfg.ConsumeInterval/2
if !lagging {
// Either we did not find a commit offset or we are not lagging behind by
// more than 1.5 times the consume interval.
// When there is no kafka commit, we play safe and assume seenTillTs and
// lastBlockEnd was 0 to not discard any samples unnecessarily.
_, _, _, err = b.consumePartition(ctx, part, lag, seenTillTs, lastBlockEnd, cycleEnd)
if err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part)
}

// Make sure to unblock rebalance of the group after the partition was consumed AND after we (potentially) committed
// this partition's offset to the group.
// TODO(v): test for this case
b.kafkaClient.AllowRebalance()
continue
}

// We are lagging behind. We need to consume the partition in parts.

// We iterate through all the cycleEnds starting from the first one after commit until the cycleEnd.
cycleEndStartAt := commitRecTime.Truncate(b.cfg.ConsumeInterval).Add(b.cfg.ConsumeInterval + b.cfg.ConsumeIntervalBuffer)
for ce := cycleEndStartAt; cycleEnd.Sub(ce) >= 0; ce = ce.Add(b.cfg.ConsumeInterval) {
// Instead of looking for the commit metadata for each iteration, we use the data returned by consumePartition
// in the next iteration.
lag, seenTillTs, lastBlockEnd, err = b.consumePartition(ctx, part, lag, seenTillTs, lastBlockEnd, ce)
if err != nil {
level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part)
return err
}
// If adding the ConsumeInterval takes it beyond the cycleEnd, we set it to the cycleEnd to not
// exit the loop without consuming until cycleEnd.
if ce.Compare(cycleEnd) != 0 && ce.Add(b.cfg.ConsumeInterval).After(cycleEnd) {
ce = cycleEnd
}
}

// Make sure to unblock rebalance of the group after the partition was consumed AND after we (potentially) committed
// this partition's offset to the group.
// TODO(v): test for this case
Expand All @@ -291,7 +349,21 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time)
return nil
}

func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time, part int32, lag int64) (err error) {
// consumePartition consumes records from the given partition until the cycleEnd timestamp.
// If the partition is lagging behind, the caller of consumePartition needs to take care of
// calling consumePartition in parts.
// consumePartition returns
// * retLag: updated lag after consuming the partition.
// * retSeenTillTs: timestamp of the last record processed (part of commit metadata).
// * retBlockMax: timestamp of the block end in this cycle (part of commit metadata).
func (b *BlockBuilder) consumePartition(
ctx context.Context,
part int32,
lag,
seenTillTs, // Kafka record timestamp till which records were processed before.
lastBlockMax int64, // blockEndAt associated with the previous commit
cycleEnd time.Time,
) (retLag, retSeenTillTs, retBlockMax int64, err error) {
// TopicPartition to resume consuming on this iteration.
// Note: pause/resume is a client-local state. On restart or a crash, the client will be assigned its share of partitions,
// during consumer group's rebalancing, and it will continue consuming as usual.
Expand All @@ -306,29 +378,23 @@ func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time
}(time.Now())

builder := b.tsdbBuilder()
defer builder.close()

blockStartAt, blockEndAt := blockBounds(cycleEnd, b.cfg.ConsumeInterval)
if blockEndAt.Before(cycleEnd) {
// This should never happen.
panic(fmt.Errorf("block bounds [%s, %s] outside of cycle %s", blockStartAt, blockEndAt, cycleEnd))
}
defer builder.close() // TODO: handle error

var (
done bool

firstUncommittedRec *kgo.Record
checkpointRec *kgo.Record
done bool
commitRec *kgo.Record
lastRec *kgo.Record
blockEndAt = cycleEnd.Truncate(b.cfg.ConsumeInterval)
blockMax = blockEndAt.UnixMilli()
)
for !done {
// Limit time client waits for a new batch. Otherwise, the client will hang if it lands on an inactive partition.
ctx1, cancel := context.WithTimeout(ctx, 5*time.Second)
fetches := b.kafkaClient.PollFetches(ctx1)
cancel()

if fetches.IsClientClosed() {
level.Warn(b.logger).Log("msg", "client closed when fetching records")
return nil
return lag, seenTillTs, lastBlockMax, nil
}

fetches.EachError(func(_ string, part int32, err error) {
Expand All @@ -351,54 +417,44 @@ func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time
// Stop consuming after we reached the cycleEnd marker.
// NOTE: the timestamp of the record is when the record was produced relative to distributor's time.
if rec.Timestamp.After(cycleEnd) {
if firstUncommittedRec == nil {
firstUncommittedRec = rec
}
done = true
break
}

// TODO(v): refactor block bounds check into a state machine:
// During catching up, if builder skipped samples, roll back the offset to first uncommited after compacting and starting next block
if rec.Timestamp.Before(blockStartAt) || rec.Timestamp.After(blockEndAt) {
// When BB is first deployed or if it is lagging behind, then it might consuming data from too much
// in the past. In which case if we try to consume all at once, it can overwhelm the system.
// So we break this into multiple block building cycles.
if rec.Timestamp.After(blockEndAt) {
if err := builder.compactAndUpload(ctx, b.blockUploaderForUser); err != nil {
return fmt.Errorf("compact tsdb builder: %w", err)
}
}
blockStartAt, blockEndAt = blockBounds(rec.Timestamp, b.cfg.ConsumeInterval)
}

level.Debug(b.logger).Log("msg", "process record", "offset", rec.Offset, "rec", rec.Timestamp, "bmin", blockStartAt, "bmax", blockEndAt)
level.Debug(b.logger).Log("msg", "process record", "offset", rec.Offset, "rec", rec.Timestamp, "last_bmax", lastBlockMax, "bmax", blockMax)

recProcessedBefore := false // TODO: get this from kafka commit
blockMin, blockMax := blockStartAt.UnixMilli(), blockEndAt.UnixMilli()
allSamplesProcessed, err := builder.process(ctx, rec, blockMin, blockMax, recProcessedBefore)
recProcessedBefore := rec.Timestamp.UnixMilli() <= seenTillTs
allSamplesProcessed, err := builder.process(ctx, rec, lastBlockMax, blockMax, recProcessedBefore)
if err != nil {
level.Error(b.logger).Log("msg", "failed to process record", "part", part, "key", string(rec.Key), "err", err)
continue
// TODO(codesome): do we just ignore this? What if it was Mimir's issue and this leading to data loss?
// TODO(codesome): add metric
} else if allSamplesProcessed {
// TODO(v): only advance checkpoint when all previously seen records from this cycle were processed
checkpointRec = rec
} else {
if firstUncommittedRec == nil {
firstUncommittedRec = rec
}
}
if !allSamplesProcessed && commitRec == nil {
// If block builder restarts, it will start consuming from the record after this from kafka.
// So the commit record should be the last record that was fully processed and not the
// first record that was not fully processed.
commitRec = lastRec
}
lastRec = rec
}
}

if err := builder.compactAndUpload(ctx, b.blockUploaderForUser); err != nil {
// TODO(codesome): add metric
return err
return lag, seenTillTs, lastBlockMax, err
}

if commitRec == nil {
// All samples in all records were processed. We can commit the last record's offset.
commitRec = lastRec
}

// TODO(codesome): Make sure all the blocks have been shipped before committing the offset.
return b.finalizePartition(ctx, firstUncommittedRec, checkpointRec)
if lastRec != nil {
retSeenTillTs = lastRec.Timestamp.UnixMilli()
}
return lag, retSeenTillTs, blockMax, commitRecord(ctx, b.logger, b.kafkaClient, b.cfg.Kafka.Topic, commitRec, lastRec, blockMax)
}

func (b *BlockBuilder) blockUploaderForUser(ctx context.Context, userID string) blockUploader {
Expand Down Expand Up @@ -427,39 +483,77 @@ func (b *BlockBuilder) blockUploaderForUser(ctx context.Context, userID string)
}
}

func (b *BlockBuilder) finalizePartition(ctx context.Context, uncommittedRec, checkpointRec *kgo.Record) error {
// If there is an uncommitted record, rewind the client to the record's offset to consume it on the next cycle.
if uncommittedRec != nil {
part := uncommittedRec.Partition
b.kafkaClient.SetOffsets(map[string]map[int32]kgo.EpochOffset{
b.cfg.Kafka.Topic: {
part: {
Epoch: uncommittedRec.LeaderEpoch,
Offset: uncommittedRec.Offset - 1,
},
},
})
func commitRecord(ctx context.Context, l log.Logger, kc *kgo.Client, topic string, commitRec, lastReadRec *kgo.Record, blockEnd int64) error {
if commitRec == nil {
return nil
}
// Rewind the offset to the commit record so that when the partition is read again by this
// block builder, it starts at the commit point.
kc.SetOffsets(map[string]map[int32]kgo.EpochOffset{
topic: {
commitRec.Partition: {
Epoch: commitRec.LeaderEpoch,
Offset: commitRec.Offset,
},
},
})

if checkpointRec != nil {
// TODO(codesome): persist uncommittedRec with checkpoint's metadata
err := b.kafkaClient.CommitRecords(ctx, checkpointRec)
if err != nil {
// TODO(codesome): add metric
return err
ctx = kgo.PreCommitFnContext(ctx, func(req *kmsg.OffsetCommitRequest) error {
meta := marshallCommitMeta(commitRec.Timestamp.UnixMilli(), lastReadRec.Timestamp.UnixMilli(), blockEnd)
for ti := range req.Topics {
if req.Topics[ti].Topic != topic {
continue
}
for pi := range req.Topics[ti].Partitions {
if req.Topics[ti].Partitions[pi].Partition == commitRec.Partition {
req.Topics[ti].Partitions[pi].Metadata = &meta
}
}
}
level.Info(l).Log("commit request", fmt.Sprintf("%+v", req))
return nil
})

if err := kc.CommitRecords(ctx, commitRec); err != nil {
return fmt.Errorf("commit record with part %d, offset %d: %w", commitRec.Partition, commitRec.Offset, err)
}

level.Debug(l).Log("msg", "successfully committed to Kafka", "part", commitRec.Partition, "offset", commitRec.Offset)

return nil
}

func blockBounds(t time.Time, length time.Duration) (time.Time, time.Time) {
maxt := t.Truncate(length)
if maxt.Before(t) {
maxt = maxt.Add(length)
const (
kafkaCommitMetaV1 = 1
)

// commitRecTs: timestamp of the record which was comitted (and not the commit time).
// lastRecTs: timestamp of the last record processed (which will be >= commitRecTs).
// blockEnd: timestamp of the block end in this cycle.
func marshallCommitMeta(commitRecTs, lastRecTs, blockEnd int64) string {
return fmt.Sprintf("%d,%d,%d,%d", kafkaCommitMetaV1, commitRecTs, lastRecTs, blockEnd)
}

// commitRecTs: timestamp of the record which was comitted (and not the commit time).
// lastRecTs: timestamp of the last record processed (which will be >= commitRecTs).
// blockEnd: timestamp of the block end in this cycle.
func unmarshallCommitMeta(meta string) (commitRecTs, lastRecTs, blockEnd int64, err error) {
var (
version int
s string
)
_, err = fmt.Sscanf(meta, "%d,%s", &version, &s)
if err != nil {
return
}

switch version {
case kafkaCommitMetaV1:
_, err = fmt.Sscanf(s, "%d,%d,%d", &commitRecTs, &lastRecTs, &blockEnd)
default:
err = fmt.Errorf("unsupported commit meta version %d", version)
}
mint := maxt.Add(-length)
return mint, maxt
return
}

type Config struct {
Expand Down
Loading

0 comments on commit 665b747

Please sign in to comment.