diff --git a/src/accountingservice/kafka/trace_interceptor.go b/src/accountingservice/kafka/trace_interceptor.go index 5e2bf1254e..91d26539a0 100644 --- a/src/accountingservice/kafka/trace_interceptor.go +++ b/src/accountingservice/kafka/trace_interceptor.go @@ -24,7 +24,7 @@ type OTelInterceptor struct { // headers with the span data. func NewOTelInterceptor(groupID string) *OTelInterceptor { oi := OTelInterceptor{} - oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/accountingservice/sarama") + oi.tracer = otel.Tracer("accountingservice") oi.fixedAttrs = []attribute.KeyValue{ semconv.MessagingSystemKafka, diff --git a/src/checkoutservice/kafka/producer.go b/src/checkoutservice/kafka/producer.go index 0d9ea5684a..6ee773d8c2 100644 --- a/src/checkoutservice/kafka/producer.go +++ b/src/checkoutservice/kafka/producer.go @@ -17,7 +17,6 @@ func CreateKafkaProducer(brokers []string, log *logrus.Logger) (sarama.AsyncProd saramaConfig.Version = ProtocolVersion // So we can know the partition and offset of messages. saramaConfig.Producer.Return.Successes = true - saramaConfig.Producer.Interceptors = []sarama.ProducerInterceptor{NewOTelInterceptor()} producer, err := sarama.NewAsyncProducer(brokers, saramaConfig) if err != nil { diff --git a/src/checkoutservice/kafka/trace_interceptor.go b/src/checkoutservice/kafka/trace_interceptor.go deleted file mode 100644 index 63b4c3cdcc..0000000000 --- a/src/checkoutservice/kafka/trace_interceptor.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 -package kafka - -import ( - "context" - "fmt" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/propagation" - semconv "go.opentelemetry.io/otel/semconv/v1.24.0" - "go.opentelemetry.io/otel/trace" - - "github.com/IBM/sarama" -) - -type OTelInterceptor struct { - tracer trace.Tracer - fixedAttrs []attribute.KeyValue -} - -// NewOTelInterceptor processes span for intercepted messages and add some -// headers with the span data. -func NewOTelInterceptor() *OTelInterceptor { - oi := OTelInterceptor{} - oi.tracer = otel.Tracer("github.com/open-telemetry/opentelemetry-demo/checkoutservice/sarama") - - oi.fixedAttrs = []attribute.KeyValue{ - semconv.MessagingSystemKafka, - semconv.MessagingOperationPublish, - semconv.NetworkTransportTCP, - } - return &oi -} - -func (oi *OTelInterceptor) OnSend(msg *sarama.ProducerMessage) { - spanContext, span := oi.tracer.Start( - context.Background(), - fmt.Sprintf("%s publish", msg.Topic), - trace.WithSpanKind(trace.SpanKindProducer), - trace.WithAttributes( - semconv.PeerService("kafka"), - semconv.NetworkTransportTCP, - semconv.MessagingSystemKafka, - semconv.MessagingDestinationName(msg.Topic), - semconv.MessagingOperationPublish, - semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), - ), - ) - defer span.End() - - carrier := propagation.MapCarrier{} - propagator := otel.GetTextMapPropagator() - propagator.Inject(spanContext, carrier) - - for key, value := range carrier { - msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)}) - } -} diff --git a/src/checkoutservice/main.go b/src/checkoutservice/main.go index 35119e4eb3..687b7e307b 100644 --- a/src/checkoutservice/main.go +++ b/src/checkoutservice/main.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + semconv "go.opentelemetry.io/otel/semconv/v1.24.0" "net" "net/http" "os" @@ -310,7 +311,7 @@ func (cs *checkoutService) PlaceOrder(ctx context.Context, req *pb.PlaceOrderReq // send to kafka only if kafka broker address is set if cs.kafkaBrokerSvcAddr != "" { - cs.sendToPostProcessor(orderResult) + cs.sendToPostProcessor(ctx, orderResult) } resp := &pb.PlaceOrderResponse{Order: orderResult} @@ -473,7 +474,7 @@ func (cs *checkoutService) shipOrder(ctx context.Context, address *pb.Address, i return resp.GetTrackingId(), nil } -func (cs *checkoutService) sendToPostProcessor(result *pb.OrderResult) { +func (cs *checkoutService) sendToPostProcessor(ctx context.Context, result *pb.OrderResult) { message, err := proto.Marshal(result) if err != nil { log.Errorf("Failed to marshal message to protobuf: %+v", err) @@ -485,7 +486,37 @@ func (cs *checkoutService) sendToPostProcessor(result *pb.OrderResult) { Value: sarama.ByteEncoder(message), } + // Inject tracing info into message + span := createProducerSpan(ctx, &msg) + defer span.End() + cs.KafkaProducerClient.Input() <- &msg successMsg := <-cs.KafkaProducerClient.Successes() log.Infof("Successful to write message. offset: %v", successMsg.Offset) } + +func createProducerSpan(ctx context.Context, msg *sarama.ProducerMessage) trace.Span { + spanContext, span := tracer.Start( + ctx, + fmt.Sprintf("%s publish", msg.Topic), + trace.WithSpanKind(trace.SpanKindProducer), + trace.WithAttributes( + semconv.PeerService("kafka"), + semconv.NetworkTransportTCP, + semconv.MessagingSystemKafka, + semconv.MessagingDestinationName(msg.Topic), + semconv.MessagingOperationPublish, + semconv.MessagingKafkaDestinationPartition(int(msg.Partition)), + ), + ) + + carrier := propagation.MapCarrier{} + propagator := otel.GetTextMapPropagator() + propagator.Inject(spanContext, carrier) + + for key, value := range carrier { + msg.Headers = append(msg.Headers, sarama.RecordHeader{Key: []byte(key), Value: []byte(value)}) + } + + return span +}