From c38a80d88ab25b62bd8e417d831c2f0a90bac782 Mon Sep 17 00:00:00 2001 From: uturkarslan Date: Sun, 4 Feb 2024 12:05:54 +0100 Subject: [PATCH] Add error recording --- .../commons/otel/kafka/kafka_consumer_interceptor.go | 12 +++++++++--- apps/golang/kafkaconsumer/consumer/consumer.go | 3 ++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go b/apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go index 3ef4d9e..5c21764 100644 --- a/apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go +++ b/apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go @@ -58,7 +58,7 @@ func (k *KafkaConsumer) Intercept( consumerGroup string, ) ( context.Context, - func(), + func(err error), ) { consumeStartTime := time.Now() @@ -82,11 +82,17 @@ func (k *KafkaConsumer) Intercept( ) // Record consumer latency - endConsume := func() { + endConsume := func( + err error, + ) { elapsedTime := float64(time.Since(consumeStartTime)) / float64(time.Millisecond) + attrs := semconv.WithMessagingKafkaConsumerAttributes(msg, consumerGroup) + if err != nil { + attrs = append(attrs, semconv.ErrorType.String(err.Error())) + } k.latency.Record(ctx, elapsedTime, metric.WithAttributes( - semconv.WithMessagingKafkaConsumerAttributes(msg, consumerGroup)..., + attrs..., )) span.End() } diff --git a/apps/golang/kafkaconsumer/consumer/consumer.go b/apps/golang/kafkaconsumer/consumer/consumer.go index 2cdab4a..bcf75ec 100644 --- a/apps/golang/kafkaconsumer/consumer/consumer.go +++ b/apps/golang/kafkaconsumer/consumer/consumer.go @@ -203,7 +203,6 @@ func (g *groupHandler) consumeMessage( // Create consumer span (parent) ctx := context.Background() ctx, endConsume := g.Consumer.Intercept(ctx, msg, g.Opts.ConsumerGroupId) - defer endConsume() // Parse name out of the message name := string(msg.Value) @@ -214,6 +213,7 @@ func (g *groupHandler) consumeMessage( err := g.storeIntoDb(ctx, name) if err != nil { g.logger.Log(logrus.ErrorLevel, ctx, name, "Consuming message is failed.") + endConsume(err) return nil } @@ -221,6 +221,7 @@ func (g *groupHandler) consumeMessage( session.MarkMessage(msg, "") g.logger.Log(logrus.InfoLevel, ctx, name, "Consuming message is succeeded.") + endConsume(nil) return nil }