From a67edcdd73f2a5f39e2478d1d30e1d3f401863d3 Mon Sep 17 00:00:00 2001 From: uturkarslan Date: Sun, 4 Feb 2024 12:48:13 +0100 Subject: [PATCH] Fix consume context loss issue --- .../otel/kafka/kafka_consumer_interceptor.go | 60 ++++++++++++------- .../golang/kafkaconsumer/consumer/consumer.go | 8 +-- 2 files changed, 42 insertions(+), 26 deletions(-) diff --git a/apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go b/apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go index e7b28e7..2549a74 100644 --- a/apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go +++ b/apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go @@ -58,9 +58,15 @@ func (k *KafkaConsumer) Intercept( consumerGroup string, ) ( context.Context, - func(err error), + *KafkaConsumeTelemetryContext, ) { - consumeStartTime := time.Now() + // Instantiate Kafka consume telemetry context + kctCtx := &KafkaConsumeTelemetryContext{ + startTime: time.Now(), + latency: k.latency, + msg: msg, + consumerGroup: consumerGroup, + } // Get tracing info from message headers := propagation.MapCarrier{} @@ -81,25 +87,35 @@ func (k *KafkaConsumer) Intercept( trace.WithAttributes(spanAttrs...), ) - time.Sleep(time.Millisecond * 50) - k.latency.Record(ctx, float64(time.Millisecond*50)) - - // Record consumer latency - endConsume := func( - err error, - ) { - elapsedTime := float64(time.Since(consumeStartTime)) / float64(time.Millisecond) - fmt.Println(elapsedTime) - attrs := semconv.WithMessagingKafkaConsumerAttributes(msg, consumerGroup) - if err != nil { - attrs = append(attrs, semconv.ErrorType.String(err.Error())) - } - // k.latency.Record(ctx, elapsedTime, - // metric.WithAttributes( - // attrs..., - // )) - span.End() - } + // Add context and started span to Kafka consume telemetry context + kctCtx.ctx = ctx + kctCtx.span = span + + return ctx, kctCtx +} + +type KafkaConsumeTelemetryContext struct { + startTime time.Time + latency metric.Float64Histogram + msg *sarama.ConsumerMessage + consumerGroup string - return ctx, endConsume + ctx context.Context + span trace.Span +} + +func (k *KafkaConsumeTelemetryContext) End( + err error, +) { + elapsedTime := float64(time.Since(k.startTime)) / float64(time.Millisecond) + + attrs := semconv.WithMessagingKafkaConsumerAttributes(k.msg, k.consumerGroup) + if err != nil { + attrs = append(attrs, semconv.ErrorType.String(err.Error())) + } + k.latency.Record(k.ctx, elapsedTime, + metric.WithAttributes( + attrs..., + )) + k.span.End() } diff --git a/apps/golang/kafkaconsumer/consumer/consumer.go b/apps/golang/kafkaconsumer/consumer/consumer.go index bcf75ec..d4e0708 100644 --- a/apps/golang/kafkaconsumer/consumer/consumer.go +++ b/apps/golang/kafkaconsumer/consumer/consumer.go @@ -200,9 +200,9 @@ func (g *groupHandler) consumeMessage( msg *sarama.ConsumerMessage, ) error { - // Create consumer span (parent) + // Create Kafka consume telemetry context ctx := context.Background() - ctx, endConsume := g.Consumer.Intercept(ctx, msg, g.Opts.ConsumerGroupId) + ctx, kctCtx := g.Consumer.Intercept(ctx, msg, g.Opts.ConsumerGroupId) // Parse name out of the message name := string(msg.Value) @@ -213,7 +213,7 @@ func (g *groupHandler) consumeMessage( err := g.storeIntoDb(ctx, name) if err != nil { g.logger.Log(logrus.ErrorLevel, ctx, name, "Consuming message is failed.") - endConsume(err) + kctCtx.End(err) return nil } @@ -221,7 +221,7 @@ func (g *groupHandler) consumeMessage( session.MarkMessage(msg, "") g.logger.Log(logrus.InfoLevel, ctx, name, "Consuming message is succeeded.") - endConsume(nil) + kctCtx.End(nil) return nil }