diff --git a/pkg/blockbuilder/blockbuilder.go b/pkg/blockbuilder/blockbuilder.go index 5e72507424b..8c834c05472 100644 --- a/pkg/blockbuilder/blockbuilder.go +++ b/pkg/blockbuilder/blockbuilder.go @@ -274,11 +274,72 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time) return nil } - // TODO(v): rebalancing can happen between the calls to consumePartitions; if that happens, the instance may loose + // TODO(v): rebalancing can happen between the calls to consumePartition; if that happens, the instance may loose // the ownership of some of its partitions // TODO(v): test for this case + + commitOffsets, err := kadm.NewClient(b.kafkaClient).ListCommittedOffsets(ctx, b.cfg.Kafka.Topic) + if err != nil { + return fmt.Errorf("get committed offsets: %w", err) + } for part, lag := range parts { - err := b.consumePartitions(ctx, cycleEnd, part, lag) + // We look at the commit offset timestamp to determine how far behind we are lagging + // relative to the cycleEnd. We consume the partition in parts accordingly. + offset, ok := commitOffsets.Lookup(b.cfg.Kafka.Topic, part) + if ok { + commitTime := time.UnixMilli(offset.Timestamp) + if cycleEnd.Sub(commitTime) > 3*b.cfg.ConsumeInterval/2 { + // If the commit timestamp is older than the last block end time, we are lagging behind. + // We need to consume the partition in parts. We give a buffer and consume in parts if + // it is lagging behind by more than 1.5 times the consume interval. + cycleEndStartAt := commitTime.Truncate(b.cfg.ConsumeInterval).Add(b.cfg.ConsumeInterval + b.cfg.ConsumeIntervalBuffer) + + // We iterate through all the cycleEnds starting from the first one after commitTime until the cycleEnd. + for ce := cycleEndStartAt; cycleEnd.Sub(ce) >= 0; ce = ce.Add(b.cfg.ConsumeInterval) { + // TODO: Get lastBlockEndAt and seenTs from commit metadata + lastBlockEndAt := time.Now() + seenTs := int64(0) // Kafka record timestamp till which records were processed before. + // TODO(codeosme): resume start from the last commit point. See how to do it. + // TODO(codesome): since we will resume from last commit, the lag might be wrong. + // consumePartition() should ideally return the lag point for the commit record. + // Or we can get the updated lag from kafka instead. + lag, err = b.consumePartition(ctx, part, lag, time.UnixMilli(seenTs), lastBlockEndAt, ce) + if err != nil { + level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part) + } + if ce.Compare(cycleEnd) != 0 && ce.Add(b.cfg.ConsumeInterval).After(cycleEnd) { + ce = cycleEnd + } + + // Refresh the commit offsets. + commitOffsets, err = kadm.NewClient(b.kafkaClient).ListCommittedOffsets(ctx, b.cfg.Kafka.Topic) + if err != nil { + return fmt.Errorf("get committed offsets: %w", err) + } + offset, ok = commitOffsets.Lookup(b.cfg.Kafka.Topic, part) + if !ok { + // We expect a commit here. It is an error if there is none. + return fmt.Errorf("commit offset not found for topic %q partition %d", b.cfg.Kafka.Topic, part) + } + } + + // Make sure to unblock rebalance of the group after the partition was consumed AND after we (potentially) committed + // this partition's offset to the group. + // TODO(v): test for this case + b.kafkaClient.AllowRebalance() + continue + } + } + + // Either we did not find a commit offset or we are not lagging behind by + // more than 1.5 times the consume interval. + lastBlockEndAt := cycleEnd.Truncate(b.cfg.ConsumeInterval).Add(-b.cfg.ConsumeInterval) + seenTs := int64(0) // Kafka record timestamp till which records were processed before. + if ok { + // TODO(codesome): get lastBlockEndAt and seenOffset from commit metadata + } + + _, err = b.consumePartition(ctx, part, lag, time.UnixMilli(seenTs), lastBlockEndAt, cycleEnd) if err != nil { level.Error(b.logger).Log("msg", "failed to consume partition", "err", err, "part", part) } @@ -291,7 +352,18 @@ func (b *BlockBuilder) nextConsumeCycle(ctx context.Context, cycleEnd time.Time) return nil } -func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time, part int32, lag int64) (err error) { +// consumePartition consumes records from the given partition until the cycleEnd timestamp. +// If the partition is lagging behind, the caller of consumePartition needs to take care of +// calling consumePartition in parts. +// consumePartition returns the updated lag after consuming the partition. +func (b *BlockBuilder) consumePartition( + ctx context.Context, + part int32, + lag int64, + seenTillTs, // Kafka record timestamp till which records were processed before. + lastBlockEnd, // blockEndAt associated with the previous commit + cycleEnd time.Time, +) (_ int64, err error) { // TopicPartition to resume consuming on this iteration. // Note: pause/resume is a client-local state. On restart or a crash, the client will be assigned its share of partitions, // during consumer group's rebalancing, and it will continue consuming as usual. @@ -306,29 +378,24 @@ func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time }(time.Now()) builder := b.tsdbBuilder() - defer builder.close() - - blockStartAt, blockEndAt := blockBounds(cycleEnd, b.cfg.ConsumeInterval) - if blockEndAt.Before(cycleEnd) { - // This should never happen. - panic(fmt.Errorf("block bounds [%s, %s] outside of cycle %s", blockStartAt, blockEndAt, cycleEnd)) - } + defer builder.close() // TODO: handle error var ( - done bool - + done bool firstUncommittedRec *kgo.Record checkpointRec *kgo.Record + lastRec *kgo.Record + lastBlockMax = lastBlockEnd.UnixMilli() + blockMax = cycleEnd.Truncate(b.cfg.ConsumeInterval).UnixMilli() ) for !done { // Limit time client waits for a new batch. Otherwise, the client will hang if it lands on an inactive partition. ctx1, cancel := context.WithTimeout(ctx, 5*time.Second) fetches := b.kafkaClient.PollFetches(ctx1) cancel() - if fetches.IsClientClosed() { level.Warn(b.logger).Log("msg", "client closed when fetching records") - return nil + return lag, nil } fetches.EachError(func(_ string, part int32, err error) { @@ -358,47 +425,38 @@ func (b *BlockBuilder) consumePartitions(ctx context.Context, cycleEnd time.Time break } - // TODO(v): refactor block bounds check into a state machine: - // During catching up, if builder skipped samples, roll back the offset to first uncommited after compacting and starting next block - if rec.Timestamp.Before(blockStartAt) || rec.Timestamp.After(blockEndAt) { - // When BB is first deployed or if it is lagging behind, then it might consuming data from too much - // in the past. In which case if we try to consume all at once, it can overwhelm the system. - // So we break this into multiple block building cycles. - if rec.Timestamp.After(blockEndAt) { - if err := builder.compactAndUpload(ctx, b.blockUploaderForUser); err != nil { - return fmt.Errorf("compact tsdb builder: %w", err) - } - } - blockStartAt, blockEndAt = blockBounds(rec.Timestamp, b.cfg.ConsumeInterval) - } + lastRec = rec - level.Debug(b.logger).Log("msg", "process record", "offset", rec.Offset, "rec", rec.Timestamp, "bmin", blockStartAt, "bmax", blockEndAt) + level.Debug(b.logger).Log("msg", "process record", "offset", rec.Offset, "rec", rec.Timestamp, "bmin", lastBlockEnd, "bmax", blockEndAt) - recProcessedBefore := false // TODO: get this from kafka commit - blockMin, blockMax := blockStartAt.UnixMilli(), blockEndAt.UnixMilli() - allSamplesProcessed, err := builder.process(ctx, rec, blockMin, blockMax, recProcessedBefore) + recProcessedBefore := seenTillTs.After(rec.Timestamp) + allSamplesProcessed, err := builder.process(ctx, rec, lastBlockMax, blockMax, recProcessedBefore) if err != nil { level.Error(b.logger).Log("msg", "failed to process record", "part", part, "key", string(rec.Key), "err", err) // TODO(codesome): do we just ignore this? What if it was Mimir's issue and this leading to data loss? // TODO(codesome): add metric - } else if allSamplesProcessed { - // TODO(v): only advance checkpoint when all previously seen records from this cycle were processed + } + if !allSamplesProcessed && checkpointRec == nil { + // First record where all samples were not processed becomes the checkpoint/commit point. checkpointRec = rec - } else { - if firstUncommittedRec == nil { - firstUncommittedRec = rec - } + } + if firstUncommittedRec == nil { + firstUncommittedRec = rec } } } if err := builder.compactAndUpload(ctx, b.blockUploaderForUser); err != nil { // TODO(codesome): add metric - return err + return lag, err + } + + if checkpointRec == nil { + // All samples in all records were processed. We can commit the last record's offset. + checkpointRec = lastRec } - // TODO(codesome): Make sure all the blocks have been shipped before committing the offset. - return b.finalizePartition(ctx, firstUncommittedRec, checkpointRec) + return lag, b.finalizePartition(ctx, firstUncommittedRec, checkpointRec) } func (b *BlockBuilder) blockUploaderForUser(ctx context.Context, userID string) blockUploader { diff --git a/pkg/blockbuilder/blockbuilder_test.go b/pkg/blockbuilder/blockbuilder_test.go index 804fd81eaff..d1eab098382 100644 --- a/pkg/blockbuilder/blockbuilder_test.go +++ b/pkg/blockbuilder/blockbuilder_test.go @@ -48,15 +48,7 @@ func TestBlockBuilder_BuildBlocks(t *testing.T) { for i := int64(0); i < 10; i++ { ts := testEpoch.Add(time.Duration(i/5) * time.Hour) val := createWriteRequest(t, floatSample(ts.UnixMilli()), nil) - rec := &kgo.Record{ - Timestamp: ts, - Key: []byte(userID), - Value: val, - Topic: testTopic, - Partition: int32(i % numPartitions), // samples in this batch are split between N partitions - } - produceResult := writeClient.ProduceSync(ctx, rec) - require.NoError(t, produceResult.FirstErr()) + produceRecord(ctx, t, writeClient, ts, testTopic, int32(i%numPartitions), userID, val) } cfg := Config{ @@ -117,17 +109,9 @@ func TestBlockBuilder_BuildBlocks(t *testing.T) { for i := 0; i < 10; i++ { ts := testEpoch.Add(3 * time.Hour) val := createWriteRequest(t, floatSample(ts.UnixMilli()), nil) - rec := &kgo.Record{ - Timestamp: ts, - Key: []byte(userID), - Value: val, - Topic: testTopic, - Partition: 1, // these samples are only for first partition - } - produceResult := writeClient.ProduceSync(ctx, rec) - require.NoError(t, produceResult.FirstErr()) - - lastProducedOffest = produceResult[0].Record.Offset + // these samples are only for first partition + pr := produceRecord(ctx, t, writeClient, ts, testTopic, 1, userID, val) + lastProducedOffest = pr[0].Record.Offset } cycleEnd := testEpoch.Add(4 * time.Hour) @@ -148,6 +132,19 @@ func TestBlockBuilder_BuildBlocks(t *testing.T) { require.Equal(t, lastProducedOffest+1, offset.Offset) // +1 because lastProducedOffset points at already consumed record } +func produceRecord(ctx context.Context, t *testing.T, writeClient *kgo.Client, ts time.Time, topicName string, partitionID int32, userID string, content []byte) kgo.ProduceResults { + rec := &kgo.Record{ + Timestamp: ts, + Key: []byte(userID), + Value: content, + Topic: topicName, + Partition: partitionID, + } + produceResult := writeClient.ProduceSync(ctx, rec) + require.NoError(t, produceResult.FirstErr()) + return produceResult +} + func newKafkaProduceClient(t *testing.T, addrs ...string) *kgo.Client { writeClient, err := kgo.NewClient( kgo.SeedBrokers(addrs...), diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/tsdb.go index d28c24f32f0..b02fa3b9ed9 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/tsdb.go @@ -28,7 +28,7 @@ import ( ) type builder interface { - process(ctx context.Context, rec *kgo.Record, blockMin, blockMax int64, recordProcessedBefore bool) (_ bool, err error) + process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordProcessedBefore bool) (_ bool, err error) compactAndUpload(ctx context.Context, blockUploaderForUser func(context.Context, string) blockUploader) error close() error } @@ -65,7 +65,7 @@ func newTSDBBuilder(logger log.Logger, limits *validation.Overrides, blocksStora // where the sample was not put in the TSDB because it was discarded or was already processed before. // lastEnd: "end" time of the previous block building cycle. // currEnd: end time of the block we are looking at right now. -func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, blockMin, blockMax int64, recordProcessedBefore bool) (_ bool, err error) { +func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, lastBlockMax, blockMax int64, recordProcessedBefore bool) (_ bool, err error) { userID := string(rec.Key) req := mimirpb.WriteRequest{} @@ -119,7 +119,7 @@ func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, blockMin, bl allSamplesProcessed = false continue } - if recordProcessedBefore && s.TimestampMs < blockMin { + if recordProcessedBefore && s.TimestampMs < lastBlockMax { // This sample was already processed in the previous cycle. continue } @@ -147,7 +147,7 @@ func (b *tsdbBuilder) process(ctx context.Context, rec *kgo.Record, blockMin, bl allSamplesProcessed = false continue } - if recordProcessedBefore && h.Timestamp < blockMin { + if recordProcessedBefore && h.Timestamp < lastBlockMax { // This sample was already processed in the previous cycle. continue }