Skip to content

Commit

Permalink
Add error recording
Browse files Browse the repository at this point in the history
  • Loading branch information
utr1903 committed Feb 4, 2024
1 parent 829cb8a commit c38a80d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
12 changes: 9 additions & 3 deletions apps/golang/commons/otel/kafka/kafka_consumer_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (k *KafkaConsumer) Intercept(
consumerGroup string,
) (
context.Context,
func(),
func(err error),
) {
consumeStartTime := time.Now()

Expand All @@ -82,11 +82,17 @@ func (k *KafkaConsumer) Intercept(
)

// Record consumer latency
endConsume := func() {
endConsume := func(
err error,
) {
elapsedTime := float64(time.Since(consumeStartTime)) / float64(time.Millisecond)
attrs := semconv.WithMessagingKafkaConsumerAttributes(msg, consumerGroup)
if err != nil {
attrs = append(attrs, semconv.ErrorType.String(err.Error()))
}
k.latency.Record(ctx, elapsedTime,
metric.WithAttributes(
semconv.WithMessagingKafkaConsumerAttributes(msg, consumerGroup)...,
attrs...,
))
span.End()
}
Expand Down
3 changes: 2 additions & 1 deletion apps/golang/kafkaconsumer/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ func (g *groupHandler) consumeMessage(
// Create consumer span (parent)
ctx := context.Background()
ctx, endConsume := g.Consumer.Intercept(ctx, msg, g.Opts.ConsumerGroupId)
defer endConsume()

// Parse name out of the message
name := string(msg.Value)
Expand All @@ -214,13 +213,15 @@ func (g *groupHandler) consumeMessage(
err := g.storeIntoDb(ctx, name)
if err != nil {
g.logger.Log(logrus.ErrorLevel, ctx, name, "Consuming message is failed.")
endConsume(err)
return nil
}

// Acknowledge message
session.MarkMessage(msg, "")
g.logger.Log(logrus.InfoLevel, ctx, name, "Consuming message is succeeded.")

endConsume(nil)
return nil
}

Expand Down

0 comments on commit c38a80d

Please sign in to comment.