Skip to content

Commit

Permalink
chore(checkoutservice): add producer interceptor for tracing (#1400)
Browse files Browse the repository at this point in the history
* chore(checkoutservice): add producer interceptor for tracing

* chore(checkoutservice): update changelog

---------

Co-authored-by: Pierre Tessier <pierre@pierretessier.com>
  • Loading branch information
tranngoclam and puckpuck authored Feb 24, 2024
1 parent ef31bfd commit cf7bac7
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ the release.

## Unreleased

* [checkoutservice] add producer interceptor for tracing
([#1400](https://github.com/open-telemetry/opentelemetry-demo/pull/1400))
* [chore] increase memory for Collector and Jaeger
([#1396](https://github.com/open-telemetry/opentelemetry-demo/pull/1396))
* [chore] fix Make targets for restart and redeploy
Expand Down
1 change: 1 addition & 0 deletions src/accountingservice/kafka/trace_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func NewOTelInterceptor(groupID string) *OTelInterceptor {

oi.fixedAttrs = []attribute.KeyValue{
semconv.MessagingSystemKafka,
semconv.MessagingOperationReceive,
semconv.MessagingKafkaConsumerGroup(groupID),
semconv.NetworkTransportTCP,
}
Expand Down
1 change: 1 addition & 0 deletions src/checkoutservice/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProd
saramaConfig.Version = ProtocolVersion
// So we can know the partition and offset of messages.
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor()}

producer, err := sarama.NewAsyncProducer(brokers, saramaConfig)
if err != nil {
Expand Down
60 changes: 60 additions & 0 deletions src/checkoutservice/kafka/trace_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package kafka

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"go.opentelemetry.io/otel/trace"

"github.com/IBM/sarama"
)

type OTelInterceptor struct {
tracer trace.Tracer
fixedAttrs []attribute.KeyValue
}

// NewOTelInterceptor processes span for intercepted messages and add some
// headers with the span data.
func NewOTelInterceptor() *OTelInterceptor {
oi := OTelInterceptor{}
oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/checkoutservice/sarama")

oi.fixedAttrs = []attribute.KeyValue{
semconv.MessagingSystemKafka,
semconv.MessagingOperationPublish,
semconv.NetworkTransportTCP,
}
return &oi
}

func (oi *OTelInterceptor) OnSend(msg *sarama.ProducerMessage) {
spanContext, span := oi.tracer.Start(
context.Background(),
fmt.Sprintf("%s publish", msg.Topic),
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
semconv.PeerService("kafka"),
semconv.NetworkTransportTCP,
semconv.MessagingSystemKafka,
semconv.MessagingDestinationName(msg.Topic),
semconv.MessagingOperationPublish,
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
),
)
defer span.End()

carrier := propagation.MapCarrier{}
propagator := otel.GetTextMapPropagator()
propagator.Inject(spanContext, carrier)

for key, value := range carrier {
msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)})
}
}
35 changes: 2 additions & 33 deletions src/checkoutservice/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"encoding/json"
"fmt"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"net"
"net/http"
"os"
Expand Down Expand Up @@ -311,7 +310,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq

// send to kafka only if kafka broker address is set
if cs.kafkaBrokerSvcAddr != "" {
cs.sendToPostProcessor(ctx, orderResult)
cs.sendToPostProcessor(orderResult)
}

resp := &pb.PlaceOrderResponse{Order: orderResult}
Expand Down Expand Up @@ -474,7 +473,7 @@ func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, i
return resp.GetTrackingId(), nil
}

func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.OrderResult) {
func (cs *checkoutService) sendToPostProcessor(result *pb.OrderResult) {
message, err := proto.Marshal(result)
if err != nil {
log.Errorf("Failed to marshal message to protobuf: %+v", err)
Expand All @@ -486,37 +485,7 @@ func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.O
Value: sarama.ByteEncoder(message),
}

// Inject tracing info into message
span := createProducerSpan(ctx, &msg)
defer span.End()

cs.KafkaProducerClient.Input() <- &msg
successMsg := <-cs.KafkaProducerClient.Successes()
log.Infof("Successful to write message. offset: %v", successMsg.Offset)
}

func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span {
spanContext, span := tracer.Start(
ctx,
fmt.Sprintf("%s publish", msg.Topic),
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
semconv.PeerService("kafka"),
semconv.NetworkTransportTCP,
semconv.MessagingSystemKafka,
semconv.MessagingDestinationName(msg.Topic),
semconv.MessagingOperationPublish,
semconv.MessagingKafkaDestinationPartition(int(msg.Partition)),
),
)

carrier := propagation.MapCarrier{}
propagator := otel.GetTextMapPropagator()
propagator.Inject(spanContext, carrier)

for key, value := range carrier {
msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)})
}

return span
}

0 comments on commit cf7bac7

Please sign in to comment.