diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go index 5e72507424b..8313bad9891 100644 --- a/pkg/blockbuilder/blockbuilder.go +++ b/pkg/blockbuilder/blockbuilder.go @@ -17,6 +17,7 @@ import ( "github.com/thanos-io/objstore" "github.com/twmb/franz-go/pkg/kadm" "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/plugin/kprom" "github.com/grafana/mimir/pkg/storage/bucket" @@ -59,7 +60,7 @@ func New( return newTSDBBuilder(b.logger, b.limits, b.cfg.BlocksStorageConfig) } - bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", logger, reg) + bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "blockbuilder", logger, reg) if err != nil { return nil, fmt.Errorf("failed to create the bucket client: %w", err) } @@ -212,7 +213,10 @@ func (b *BlockBuilder) running(ctx context.Context) error { for { select { case cycleEnd := <-time.After(waitTime): - _ = b.nextConsumeCycle(ctx, cycleEnd.Add(-time.Second)) + err := b.nextConsumeCycle(ctx, cycleEnd.Add(-time.Second)) + if err != nil { + level.Error(b.logger).Log("msg", "consume cycle failed", "cycle_end", cycleEnd, "err", err) + } // If we took more than consumptionItvl time to consume the records, this // will immediately start the next consumption. @@ -274,14 +278,68 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time) return nil } - // TODO(v): rebalancing can happen between the calls to consumePartitions; if that happens, the instance may loose + // TODO(v): rebalancing can happen between the calls to consumePartition; if that happens, the instance may loose // the ownership of some of its partitions // TODO(v): test for this case + + offsets, err := kadm.NewClient(b.kafkaClient).FetchOffsetsForTopics(ctx, b.cfg.Kafka.ConsumerGroup, b.cfg.Kafka.Topic) + if err != nil { + return fmt.Errorf("fetch offsets for topics: %w", err) + } for part, lag := range parts { - err := b.consumePartitions(ctx, cycleEnd, part, lag) - if err != nil { - level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part) + // We look at the commit offset timestamp to determine how far behind we are lagging + // relative to the cycleEnd. We consume the partition in parts accordingly. + offset, ok := offsets.Lookup(b.cfg.Kafka.Topic, part) + var commitRecTs, seenTillTs, lastBlockEnd int64 + var commitRecTime time.Time + if ok { + level.Debug(b.logger).Log("part", offset.Partition, "offset", offset.At, "meta", offset.Metadata) + commitRecTs, seenTillTs, lastBlockEnd, err = unmarshallCommitMeta(offset.Metadata) + if err != nil { + return err + } + commitRecTime = time.UnixMilli(commitRecTs) + } else { + level.Warn(b.logger).Log("msg", "didn't find partition", "part", part, "offsets", fmt.Sprintf("%+v", offsets)) + } + + lagging := ok && cycleEnd.Sub(commitRecTime) > 3*b.cfg.ConsumeInterval/2 + if !lagging { + // Either we did not find a commit offset or we are not lagging behind by + // more than 1.5 times the consume interval. + // When there is no kafka commit, we play safe and assume seenTillTs and + // lastBlockEnd was 0 to not discard any samples unnecessarily. + _, _, _, err = b.consumePartition(ctx, part, lag, seenTillTs, lastBlockEnd, cycleEnd) + if err != nil { + level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part) + } + + // Make sure to unblock rebalance of the group after the partition was consumed AND after we (potentially) committed + // this partition's offset to the group. + // TODO(v): test for this case + b.kafkaClient.AllowRebalance() + continue } + + // We are lagging behind. We need to consume the partition in parts. + + // We iterate through all the cycleEnds starting from the first one after commit until the cycleEnd. + cycleEndStartAt := commitRecTime.Truncate(b.cfg.ConsumeInterval).Add(b.cfg.ConsumeInterval + b.cfg.ConsumeIntervalBuffer) + for ce := cycleEndStartAt; cycleEnd.Sub(ce) >= 0; ce = ce.Add(b.cfg.ConsumeInterval) { + // Instead of looking for the commit metadata for each iteration, we use the data returned by consumePartition + // in the next iteration. + lag, seenTillTs, lastBlockEnd, err = b.consumePartition(ctx, part, lag, seenTillTs, lastBlockEnd, ce) + if err != nil { + level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part) + return err + } + // If adding the ConsumeInterval takes it beyond the cycleEnd, we set it to the cycleEnd to not + // exit the loop without consuming until cycleEnd. + if ce.Compare(cycleEnd) != 0 && ce.Add(b.cfg.ConsumeInterval).After(cycleEnd) { + ce = cycleEnd + } + } + // Make sure to unblock rebalance of the group after the partition was consumed AND after we (potentially) committed // this partition's offset to the group. // TODO(v): test for this case @@ -291,7 +349,21 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time) return nil } -func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time, part int32, lag int64) (err error) { +// consumePartition consumes records from the given partition until the cycleEnd timestamp. +// If the partition is lagging behind, the caller of consumePartition needs to take care of +// calling consumePartition in parts. +// consumePartition returns +// * retLag: updated lag after consuming the partition. +// * retSeenTillTs: timestamp of the last record processed (part of commit metadata). +// * retBlockMax: timestamp of the block end in this cycle (part of commit metadata). +func (b *BlockBuilder) consumePartition( + ctx context.Context, + part int32, + lag, + seenTillTs, // Kafka record timestamp till which records were processed before. + lastBlockMax int64, // blockEndAt associated with the previous commit + cycleEnd time.Time, +) (retLag, retSeenTillTs, retBlockMax int64, err error) { // TopicPartition to resume consuming on this iteration. // Note: pause/resume is a client-local state. On restart or a crash, the client will be assigned its share of partitions, // during consumer group's rebalancing, and it will continue consuming as usual. @@ -306,29 +378,23 @@ func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time }(time.Now()) builder := b.tsdbBuilder() - defer builder.close() - - blockStartAt, blockEndAt := blockBounds(cycleEnd, b.cfg.ConsumeInterval) - if blockEndAt.Before(cycleEnd) { - // This should never happen. - panic(fmt.Errorf("block bounds [%s, %s] outside of cycle %s", blockStartAt, blockEndAt, cycleEnd)) - } + defer builder.close() // TODO: handle error var ( - done bool - - firstUncommittedRec *kgo.Record - checkpointRec *kgo.Record + done bool + commitRec *kgo.Record + lastRec *kgo.Record + blockEndAt = cycleEnd.Truncate(b.cfg.ConsumeInterval) + blockMax = blockEndAt.UnixMilli() ) for !done { // Limit time client waits for a new batch. Otherwise, the client will hang if it lands on an inactive partition. ctx1, cancel := context.WithTimeout(ctx, 5*time.Second) fetches := b.kafkaClient.PollFetches(ctx1) cancel() - if fetches.IsClientClosed() { level.Warn(b.logger).Log("msg", "client closed when fetching records") - return nil + return lag, seenTillTs, lastBlockMax, nil } fetches.EachError(func(_ string, part int32, err error) { @@ -351,54 +417,44 @@ func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time // Stop consuming after we reached the cycleEnd marker. // NOTE: the timestamp of the record is when the record was produced relative to distributor's time. if rec.Timestamp.After(cycleEnd) { - if firstUncommittedRec == nil { - firstUncommittedRec = rec - } done = true break } - // TODO(v): refactor block bounds check into a state machine: - // During catching up, if builder skipped samples, roll back the offset to first uncommited after compacting and starting next block - if rec.Timestamp.Before(blockStartAt) || rec.Timestamp.After(blockEndAt) { - // When BB is first deployed or if it is lagging behind, then it might consuming data from too much - // in the past. In which case if we try to consume all at once, it can overwhelm the system. - // So we break this into multiple block building cycles. - if rec.Timestamp.After(blockEndAt) { - if err := builder.compactAndUpload(ctx, b.blockUploaderForUser); err != nil { - return fmt.Errorf("compact tsdb builder: %w", err) - } - } - blockStartAt, blockEndAt = blockBounds(rec.Timestamp, b.cfg.ConsumeInterval) - } - - level.Debug(b.logger).Log("msg", "process record", "offset", rec.Offset, "rec", rec.Timestamp, "bmin", blockStartAt, "bmax", blockEndAt) + level.Debug(b.logger).Log("msg", "process record", "offset", rec.Offset, "rec", rec.Timestamp, "last_bmax", lastBlockMax, "bmax", blockMax) - recProcessedBefore := false // TODO: get this from kafka commit - blockMin, blockMax := blockStartAt.UnixMilli(), blockEndAt.UnixMilli() - allSamplesProcessed, err := builder.process(ctx, rec, blockMin, blockMax, recProcessedBefore) + recProcessedBefore := rec.Timestamp.UnixMilli() <= seenTillTs + allSamplesProcessed, err := builder.process(ctx, rec, lastBlockMax, blockMax, recProcessedBefore) if err != nil { level.Error(b.logger).Log("msg", "failed to process record", "part", part, "key", string(rec.Key), "err", err) + continue // TODO(codesome): do we just ignore this? What if it was Mimir's issue and this leading to data loss? // TODO(codesome): add metric - } else if allSamplesProcessed { - // TODO(v): only advance checkpoint when all previously seen records from this cycle were processed - checkpointRec = rec - } else { - if firstUncommittedRec == nil { - firstUncommittedRec = rec - } } + if !allSamplesProcessed && commitRec == nil { + // If block builder restarts, it will start consuming from the record after this from kafka. + // So the commit record should be the last record that was fully processed and not the + // first record that was not fully processed. + commitRec = lastRec + } + lastRec = rec } } if err := builder.compactAndUpload(ctx, b.blockUploaderForUser); err != nil { // TODO(codesome): add metric - return err + return lag, seenTillTs, lastBlockMax, err + } + + if commitRec == nil { + // All samples in all records were processed. We can commit the last record's offset. + commitRec = lastRec } - // TODO(codesome): Make sure all the blocks have been shipped before committing the offset. - return b.finalizePartition(ctx, firstUncommittedRec, checkpointRec) + if lastRec != nil { + retSeenTillTs = lastRec.Timestamp.UnixMilli() + } + return lag, retSeenTillTs, blockMax, commitRecord(ctx, b.logger, b.kafkaClient, b.cfg.Kafka.Topic, commitRec, lastRec, blockMax) } func (b *BlockBuilder) blockUploaderForUser(ctx context.Context, userID string) blockUploader { @@ -427,39 +483,77 @@ func (b *BlockBuilder) blockUploaderForUser(ctx context.Context, userID string) } } -func (b *BlockBuilder) finalizePartition(ctx context.Context, uncommittedRec, checkpointRec *kgo.Record) error { - // If there is an uncommitted record, rewind the client to the record's offset to consume it on the next cycle. - if uncommittedRec != nil { - part := uncommittedRec.Partition - b.kafkaClient.SetOffsets(map[string]map[int32]kgo.EpochOffset{ - b.cfg.Kafka.Topic: { - part: { - Epoch: uncommittedRec.LeaderEpoch, - Offset: uncommittedRec.Offset - 1, - }, - }, - }) +func commitRecord(ctx context.Context, l log.Logger, kc *kgo.Client, topic string, commitRec, lastReadRec *kgo.Record, blockEnd int64) error { + if commitRec == nil { + return nil } + // Rewind the offset to the commit record so that when the partition is read again by this + // block builder, it starts at the commit point. + kc.SetOffsets(map[string]map[int32]kgo.EpochOffset{ + topic: { + commitRec.Partition: { + Epoch: commitRec.LeaderEpoch, + Offset: commitRec.Offset, + }, + }, + }) - if checkpointRec != nil { - // TODO(codesome): persist uncommittedRec with checkpoint's metadata - err := b.kafkaClient.CommitRecords(ctx, checkpointRec) - if err != nil { - // TODO(codesome): add metric - return err + ctx = kgo.PreCommitFnContext(ctx, func(req *kmsg.OffsetCommitRequest) error { + meta := marshallCommitMeta(commitRec.Timestamp.UnixMilli(), lastReadRec.Timestamp.UnixMilli(), blockEnd) + for ti := range req.Topics { + if req.Topics[ti].Topic != topic { + continue + } + for pi := range req.Topics[ti].Partitions { + if req.Topics[ti].Partitions[pi].Partition == commitRec.Partition { + req.Topics[ti].Partitions[pi].Metadata = &meta + } + } } + level.Info(l).Log("commit request", fmt.Sprintf("%+v", req)) + return nil + }) + + if err := kc.CommitRecords(ctx, commitRec); err != nil { + return fmt.Errorf("commit record with part %d, offset %d: %w", commitRec.Partition, commitRec.Offset, err) } + level.Debug(l).Log("msg", "successfully committed to Kafka", "part", commitRec.Partition, "offset", commitRec.Offset) + return nil } -func blockBounds(t time.Time, length time.Duration) (time.Time, time.Time) { - maxt := t.Truncate(length) - if maxt.Before(t) { - maxt = maxt.Add(length) +const ( + kafkaCommitMetaV1 = 1 +) + +// commitRecTs: timestamp of the record which was comitted (and not the commit time). +// lastRecTs: timestamp of the last record processed (which will be >= commitRecTs). +// blockEnd: timestamp of the block end in this cycle. +func marshallCommitMeta(commitRecTs, lastRecTs, blockEnd int64) string { + return fmt.Sprintf("%d,%d,%d,%d", kafkaCommitMetaV1, commitRecTs, lastRecTs, blockEnd) +} + +// commitRecTs: timestamp of the record which was comitted (and not the commit time). +// lastRecTs: timestamp of the last record processed (which will be >= commitRecTs). +// blockEnd: timestamp of the block end in this cycle. +func unmarshallCommitMeta(meta string) (commitRecTs, lastRecTs, blockEnd int64, err error) { + var ( + version int + s string + ) + _, err = fmt.Sscanf(meta, "%d,%s", &version, &s) + if err != nil { + return + } + + switch version { + case kafkaCommitMetaV1: + _, err = fmt.Sscanf(s, "%d,%d,%d", &commitRecTs, &lastRecTs, &blockEnd) + default: + err = fmt.Errorf("unsupported commit meta version %d", version) } - mint := maxt.Add(-length) - return mint, maxt + return } type Config struct { diff --git a/pkg/blockbuilder/blockbuilder_test.go b/pkg/blockbuilder/blockbuilder_test.go index 804fd81eaff..e1a3d8d3ac9 100644 --- a/pkg/blockbuilder/blockbuilder_test.go +++ b/pkg/blockbuilder/blockbuilder_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -16,6 +17,7 @@ import ( mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb" "github.com/grafana/mimir/pkg/util/test" + "github.com/grafana/mimir/pkg/util/testkafka" "github.com/grafana/mimir/pkg/util/validation" ) @@ -48,15 +50,7 @@ func TestBlockBuilder_BuildBlocks(t *testing.T) { for i := int64(0); i < 10; i++ { ts := testEpoch.Add(time.Duration(i/5) * time.Hour) val := createWriteRequest(t, floatSample(ts.UnixMilli()), nil) - rec := &kgo.Record{ - Timestamp: ts, - Key: []byte(userID), - Value: val, - Topic: testTopic, - Partition: int32(i % numPartitions), // samples in this batch are split between N partitions - } - produceResult := writeClient.ProduceSync(ctx, rec) - require.NoError(t, produceResult.FirstErr()) + produceRecords(t, ctx, writeClient, ts, userID, testTopic, int32(i%numPartitions), val) } cfg := Config{ @@ -117,16 +111,8 @@ func TestBlockBuilder_BuildBlocks(t *testing.T) { for i := 0; i < 10; i++ { ts := testEpoch.Add(3 * time.Hour) val := createWriteRequest(t, floatSample(ts.UnixMilli()), nil) - rec := &kgo.Record{ - Timestamp: ts, - Key: []byte(userID), - Value: val, - Topic: testTopic, - Partition: 1, // these samples are only for first partition - } - produceResult := writeClient.ProduceSync(ctx, rec) - require.NoError(t, produceResult.FirstErr()) - + // these samples are only for first partition + produceResult := produceRecords(t, ctx, writeClient, ts, userID, testTopic, 1, val) lastProducedOffest = produceResult[0].Record.Offset } @@ -148,6 +134,19 @@ func TestBlockBuilder_BuildBlocks(t *testing.T) { require.Equal(t, lastProducedOffest+1, offset.Offset) // +1 because lastProducedOffset points at already consumed record } +func produceRecords(t *testing.T, ctx context.Context, writeClient *kgo.Client, ts time.Time, userID, topic string, part int32, val []byte) kgo.ProduceResults { + rec := &kgo.Record{ + Timestamp: ts, + Key: []byte(userID), + Value: val, + Topic: topic, + Partition: part, // samples in this batch are split between N partitions + } + produceResult := writeClient.ProduceSync(ctx, rec) + require.NoError(t, produceResult.FirstErr()) + return produceResult +} + func newKafkaProduceClient(t *testing.T, addrs ...string) *kgo.Client { writeClient, err := kgo.NewClient( kgo.SeedBrokers(addrs...), @@ -175,3 +174,84 @@ func (t testTSDBBuilder) compactAndUpload(ctx context.Context, blockUploaderForU func (t testTSDBBuilder) close() error { return nil } + +func TestKafkaCommitMetaMarshalling(t *testing.T) { + v1 := int64(892734) + v2 := int64(598237948) + v3 := int64(340237948) + + o1, o2, o3, err := unmarshallCommitMeta(marshallCommitMeta(v1, v2, v3)) + require.NoError(t, err) + require.Equal(t, v1, o1) + require.Equal(t, v2, o2) + require.Equal(t, v3, o3) + + // Unsupported version + _, _, _, err = unmarshallCommitMeta("2,2,3,4") + require.Error(t, err) + require.Equal(t, "unsupported commit meta version 2", err.Error()) + + // Error parsing + _, _, _, err = unmarshallCommitMeta("1,3,4") + require.Error(t, err) +} + +func TestKafkaCommitMetadata(t *testing.T) { + const ( + testTopic = "test" + testGroup = "testgroup" + numRecs = 10 + ) + + ctx, cancel := context.WithCancelCause(context.Background()) + t.Cleanup(func() { cancel(errors.New("test done")) }) + + _, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 1, testTopic) + writeClient := newKafkaProduceClient(t, addr) + + for i := int64(0); i < numRecs; i++ { + val := createWriteRequest(t, floatSample(i), nil) + produceRecords(t, ctx, writeClient, time.Now(), "1", testTopic, 0, val) + } + + opts := []kgo.Opt{ + kgo.ClientID("1"), + kgo.SeedBrokers(addr), + kgo.DialTimeout(10 * time.Second), + kgo.ConsumeTopics(testTopic), + kgo.ConsumerGroup(testGroup), + kgo.DisableAutoCommit(), + } + kc, err := kgo.NewClient(opts...) + require.NoError(t, err) + + fetches := kc.PollFetches(ctx) + require.NoError(t, fetches.Err()) + + var recs []*kgo.Record + for it := fetches.RecordIter(); !it.Done(); { + recs = append(recs, it.Next()) + } + require.Len(t, recs, numRecs) + + commitRec := recs[numRecs/2] + lastRec := recs[numRecs-1] + blockEnd := time.Now().Truncate(1 * time.Hour).UnixMilli() + + err = commitRecord(ctx, log.NewNopLogger(), kc, testTopic, commitRec, lastRec, blockEnd) + require.NoError(t, err) + + // Checking the commit + offsets, err := kadm.NewClient(kc).FetchOffsetsForTopics(ctx, testGroup, testTopic) + require.NoError(t, err) + + offset, ok := offsets.Lookup(testTopic, 0) + require.True(t, ok) + require.Equal(t, kadm.Offset{ + Topic: testTopic, + Partition: 0, + At: commitRec.Offset + 1, + LeaderEpoch: commitRec.LeaderEpoch, + Metadata: marshallCommitMeta(commitRec.Timestamp.UnixMilli(), lastRec.Timestamp.UnixMilli(), blockEnd), + }, offset.Offset) +} diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go index d28c24f32f0..b02fa3b9ed9 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/tsdb.go @@ -28,7 +28,7 @@ import ( ) type builder interface { - process(ctx context.Context, rec *kgo.Record, blockMin, blockMax int64, recordProcessedBefore bool) (_ bool, err error) + process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordProcessedBefore bool) (_ bool, err error) compactAndUpload(ctx context.Context, blockUploaderForUser func(context.Context, string) blockUploader) error close() error } @@ -65,7 +65,7 @@ func newTSDBBuilder(logger log.Logger, limits *validation.Overrides, blocksStora // where the sample was not put in the TSDB because it was discarded or was already processed before. // lastEnd: "end" time of the previous block building cycle. // currEnd: end time of the block we are looking at right now. -func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, blockMin, blockMax int64, recordProcessedBefore bool) (_ bool, err error) { +func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordProcessedBefore bool) (_ bool, err error) { userID := string(rec.Key) req := mimirpb.WriteRequest{} @@ -119,7 +119,7 @@ func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, blockMin, bl allSamplesProcessed = false continue } - if recordProcessedBefore && s.TimestampMs < blockMin { + if recordProcessedBefore && s.TimestampMs < lastBlockMax { // This sample was already processed in the previous cycle. continue } @@ -147,7 +147,7 @@ func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, blockMin, bl allSamplesProcessed = false continue } - if recordProcessedBefore && h.Timestamp < blockMin { + if recordProcessedBefore && h.Timestamp < lastBlockMax { // This sample was already processed in the previous cycle. continue }