Skip to content

Commit

Permalink
Fix consume context loss issue
Browse files Browse the repository at this point in the history
  • Loading branch information
utr1903 committed Feb 4, 2024
1 parent d18704a commit a67edcd
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 26 deletions.
60 changes: 38 additions & 22 deletions apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,15 @@ func (k *KafkaConsumer) Intercept(
consumerGroup string,
) (
context.Context,
func(err error),
*KafkaConsumeTelemetryContext,
) {
consumeStartTime := time.Now()
// Instantiate Kafka consume telemetry context
kctCtx := &KafkaConsumeTelemetryContext{
startTime: time.Now(),
latency: k.latency,
msg: msg,
consumerGroup: consumerGroup,
}

// Get tracing info from message
headers := propagation.MapCarrier{}
Expand All @@ -81,25 +87,35 @@ func (k *KafkaConsumer) Intercept(
trace.WithAttributes(spanAttrs...),
)

time.Sleep(time.Millisecond * 50)
k.latency.Record(ctx, float64(time.Millisecond*50))

// Record consumer latency
endConsume := func(
err error,
) {
elapsedTime := float64(time.Since(consumeStartTime)) / float64(time.Millisecond)
fmt.Println(elapsedTime)
attrs := semconv.WithMessagingKafkaConsumerAttributes(msg, consumerGroup)
if err != nil {
attrs = append(attrs, semconv.ErrorType.String(err.Error()))
}
// k.latency.Record(ctx, elapsedTime,
// metric.WithAttributes(
// attrs...,
// ))
span.End()
}
// Add context and started span to Kafka consume telemetry context
kctCtx.ctx = ctx
kctCtx.span = span

return ctx, kctCtx
}

type KafkaConsumeTelemetryContext struct {
startTime time.Time
latency metric.Float64Histogram
msg *sarama.ConsumerMessage
consumerGroup string

return ctx, endConsume
ctx context.Context
span trace.Span
}

func (k *KafkaConsumeTelemetryContext) End(
err error,
) {
elapsedTime := float64(time.Since(k.startTime)) / float64(time.Millisecond)

attrs := semconv.WithMessagingKafkaConsumerAttributes(k.msg, k.consumerGroup)
if err != nil {
attrs = append(attrs, semconv.ErrorType.String(err.Error()))
}
k.latency.Record(k.ctx, elapsedTime,
metric.WithAttributes(
attrs...,
))
k.span.End()
}
8 changes: 4 additions & 4 deletions apps/golang/kafkaconsumer/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ func (g *groupHandler) consumeMessage(
msg *sarama.ConsumerMessage,
) error {

// Create consumer span (parent)
// Create Kafka consume telemetry context
ctx := context.Background()
ctx, endConsume := g.Consumer.Intercept(ctx, msg, g.Opts.ConsumerGroupId)
ctx, kctCtx := g.Consumer.Intercept(ctx, msg, g.Opts.ConsumerGroupId)

// Parse name out of the message
name := string(msg.Value)
Expand All @@ -213,15 +213,15 @@ func (g *groupHandler) consumeMessage(
err := g.storeIntoDb(ctx, name)
if err != nil {
g.logger.Log(logrus.ErrorLevel, ctx, name, "Consuming message is failed.")
endConsume(err)
kctCtx.End(err)
return nil
}

// Acknowledge message
session.MarkMessage(msg, "")
g.logger.Log(logrus.InfoLevel, ctx, name, "Consuming message is succeeded.")

endConsume(nil)
kctCtx.End(nil)
return nil
}

Expand Down

0 comments on commit a67edcd

Please sign in to comment.