Skip to content

Commit

Permalink
[accountingservice] add attributes to kafka spans (open-telemetry#1286)
Browse files Browse the repository at this point in the history
* add attributes to kafka spans

Signed-off-by: Pierre Tessier <pierre@pierretessier.com>

* add attributes to kafka spans

Signed-off-by: Pierre Tessier <pierre@pierretessier.com>

---------

Signed-off-by: Pierre Tessier <pierre@pierretessier.com>
  • Loading branch information
puckpuck authored Dec 4, 2023
1 parent 76046b0 commit b296ff3
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ release.
([#1236](https://github.com/open-telemetry/opentelemetry-demo/pull/1236))
* [cartservice] Add .NET memory, CPU, and thread metrics
([#1265](https://github.com/open-telemetry/opentelemetry-demo/pull/1265))
* enable browser traffic in loadgenerator using playwright ([#1266](https://github.com/open-telemetry/opentelemetry-demo/pull/1266))
* enable browser traffic in loadgenerator using playwright
([#1266](https://github.com/open-telemetry/opentelemetry-demo/pull/1266))
* [accountingservice] Add additional attributes to Kafka spans
([#1286](https://github.com/open-telemetry/opentelemetry-demo/pull/1286))

## 1.6.0

Expand Down
2 changes: 1 addition & 1 deletion src/accountingservice/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func StartConsumerGroup(ctx context.Context, brokers []string, log *logrus.Logge
saramaConfig.Version = ProtocolVersion
// So we can know the partition and offset of messages.
saramaConfig.Producer.Return.Successes = true
saramaConfig.Consumer.Interceptors = []sarama.ConsumerInterceptor{NewOTelInterceptor()}
saramaConfig.Consumer.Interceptors = []sarama.ConsumerInterceptor{NewOTelInterceptor(GroupID)}

consumerGroup, err := sarama.NewConsumerGroup(brokers, GroupID, saramaConfig)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion src/accountingservice/kafka/trace_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ type OTelInterceptor struct {

// NewOTelInterceptor processes span for intercepted messages and add some
// headers with the span data.
func NewOTelInterceptor() *OTelInterceptor {
func NewOTelInterceptor(groupID string) *OTelInterceptor {
oi := OTelInterceptor{}
oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/accountingservice/sarama")

oi.fixedAttrs = []attribute.KeyValue{
semconv.MessagingSystem("kafka"),
semconv.MessagingKafkaConsumerGroup(groupID),
semconv.NetTransportTCP,
}
return &oi
Expand All @@ -50,6 +51,8 @@ func (oi *OTelInterceptor) OnConsume(msg *sarama.ConsumerMessage) {
trace.WithAttributes(
semconv.MessagingDestinationKindTopic,
semconv.MessagingDestinationName(msg.Topic),
semconv.MessagingKafkaMessageOffset(int(msg.Offset)),
semconv.MessagingMessagePayloadSizeBytes(len(msg.Value)),
semconv.MessagingOperationReceive,
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
),
Expand Down

0 comments on commit b296ff3

Please sign in to comment.