From 8895e57ad38381ea15fbb1ee2238aed7f84686a7 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 18 Dec 2024 09:25:46 -0800 Subject: [PATCH 1/2] Log Kafka offset. --- processes/consumer/configs.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/processes/consumer/configs.go b/processes/consumer/configs.go index 87594556a..203b10b00 100644 --- a/processes/consumer/configs.go +++ b/processes/consumer/configs.go @@ -2,6 +2,7 @@ package consumer import ( "context" + "log/slog" "sync" "github.com/artie-labs/transfer/lib/artie" @@ -45,6 +46,8 @@ func commitOffset(ctx context.Context, topic string, partitionsToOffset map[stri if err := topicToConsumer.Get(topic).CommitMessages(ctx, *msg.KafkaMsg); err != nil { return err } + + slog.Info("Committing offset", slog.String("topic", topic), slog.Int("partition", msg.KafkaMsg.Partition), slog.Int64("offset", msg.KafkaMsg.Offset)) } if msg.PubSub != nil { From 1d5117f1e0cae2d61fbc770427438fe23dc15350 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Wed, 18 Dec 2024 10:19:24 -0800 Subject: [PATCH 2/2] Log. --- processes/consumer/configs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processes/consumer/configs.go b/processes/consumer/configs.go index 203b10b00..889820407 100644 --- a/processes/consumer/configs.go +++ b/processes/consumer/configs.go @@ -47,7 +47,7 @@ func commitOffset(ctx context.Context, topic string, partitionsToOffset map[stri return err } - slog.Info("Committing offset", slog.String("topic", topic), slog.Int("partition", msg.KafkaMsg.Partition), slog.Int64("offset", msg.KafkaMsg.Offset)) + slog.Info("Successfully committed Kafka offset", slog.String("topic", topic), slog.Int("partition", msg.KafkaMsg.Partition), slog.Int64("offset", msg.KafkaMsg.Offset)) } if msg.PubSub != nil {