diff --git a/docs/semantic-conventions.md b/docs/semantic-conventions.md index e261ce43531f..3a3a5a8df4a2 100644 --- a/docs/semantic-conventions.md +++ b/docs/semantic-conventions.md @@ -89,3 +89,22 @@ not values defined by spec. | `db.hbase` | Y | -, HBase is not supported | | `db.redis.database_index` | N | only set for Lettuce driver, not for Jedis | | `db.mongodb.collection` | Y | - | + +## Messaging + + Attribute name | Required? | Implemented? | +| -------------- | :-----: | :---: | +| `messaging.system` | Y | + | +| `messaging.destination` | Y | + | +| `messaging.destination_kind` | Y | + | +| `messaging.temp_destination` | N | - | +| `messaging.protocol` | N | - | +| `messaging.protocol_version` | N | - | +| `messaging.url` | N | - | +| `messaging.message_id` | N | only for JMS | +| `messaging.conversation_id` | N | only for JMS | +| `messaging.message_payload_size_bytes` | N | only for RabbitMQ and Kafka [1] | +| `messaging.message_payload_compressed_size_bytes` | N | - | +| `messaging.operation` | for consumers only | + + +**[1]:** Kafka consumer instrumentation sets this to the serialized size of the value diff --git a/instrumentation/jms-1.1/src/jms2Test/groovy/JMS2Test.groovy b/instrumentation/jms-1.1/src/jms2Test/groovy/JMS2Test.groovy index 5cd6ae249f73..b3b446faaada 100644 --- a/instrumentation/jms-1.1/src/jms2Test/groovy/JMS2Test.groovy +++ b/instrumentation/jms-1.1/src/jms2Test/groovy/JMS2Test.groovy @@ -3,13 +3,13 @@ * SPDX-License-Identifier: Apache-2.0 */ -import static io.opentelemetry.trace.Span.Kind.CLIENT import static io.opentelemetry.trace.Span.Kind.CONSUMER import static io.opentelemetry.trace.Span.Kind.PRODUCER import com.google.common.io.Files import io.opentelemetry.instrumentation.test.AgentTestRunner import io.opentelemetry.instrumentation.test.asserts.TraceAssert +import io.opentelemetry.javaagent.instrumentation.jms.JMSTracer import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.trace.attributes.SemanticAttributes import java.util.concurrent.CountDownLatch @@ -29,7 +29,6 @@ import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory import org.hornetq.core.server.HornetQServer import org.hornetq.core.server.HornetQServers -import org.hornetq.jms.client.HornetQMessageConsumer import org.hornetq.jms.client.HornetQTextMessage import spock.lang.Shared @@ -99,7 +98,7 @@ class JMS2Test extends AgentTestRunner { producerSpan(it, 0, destinationType, destinationName) } trace(1, 1) { - consumerSpan(it, 0, destinationType, destinationName, messageId, false, HornetQMessageConsumer, traces[0][0]) + consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive") } } @@ -111,8 +110,8 @@ class JMS2Test extends AgentTestRunner { destination | destinationType | destinationName session.createQueue("someQueue") | "queue" | "someQueue" session.createTopic("someTopic") | "topic" | "someTopic" - session.createTemporaryQueue() | "queue" | "" - session.createTemporaryTopic() | "topic" | "" + session.createTemporaryQueue() | "queue" | JMSTracer.TEMP_DESTINATION_NAME + session.createTemporaryTopic() | "topic" | JMSTracer.TEMP_DESTINATION_NAME } def "sending to a MessageListener on #destinationName #destinationType generates a span"() { @@ -136,7 +135,7 @@ class JMS2Test extends AgentTestRunner { assertTraces(1) { trace(0, 2) { producerSpan(it, 0, destinationType, destinationName) - consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), true, consumer.messageListener.class, span(0)) + consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process") } } // This check needs to go after all traces have been accounted for @@ -150,8 +149,8 @@ class JMS2Test extends AgentTestRunner { destination | destinationType | destinationName session.createQueue("someQueue") | "queue" | "someQueue" session.createTopic("someTopic") | "topic" | "someTopic" - session.createTemporaryQueue() | "queue" | "" - session.createTemporaryTopic() | "topic" | "" + session.createTemporaryQueue() | "queue" | JMSTracer.TEMP_DESTINATION_NAME + session.createTemporaryTopic() | "topic" | JMSTracer.TEMP_DESTINATION_NAME } def "failing to receive message with receiveNoWait on #destinationName #destinationType works"() { @@ -167,12 +166,14 @@ class JMS2Test extends AgentTestRunner { trace(0, 1) { // Consumer trace span(0) { hasNoParent() - name destinationType + "/" + destinationName + " receive" - kind CLIENT + name destinationName + " receive" + kind CONSUMER errored false attributes { - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType - "${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType + "${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" } } } @@ -200,12 +201,15 @@ class JMS2Test extends AgentTestRunner { trace(0, 1) { // Consumer trace span(0) { hasNoParent() - name destinationType + "/" + destinationName + " receive" - kind CLIENT + name destinationName + " receive" + kind CONSUMER errored false attributes { - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType - "${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms" + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType + "${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + } } } @@ -222,42 +226,45 @@ class JMS2Test extends AgentTestRunner { static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) { trace.span(index) { - name destinationType + "/" + destinationName + " send" + name destinationName + " send" kind PRODUCER errored false hasNoParent() attributes { - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType - "${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName - if (destinationName == "") { - "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key()}" true + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType + if (destinationName == JMSTracer.TEMP_DESTINATION_NAME) { + "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key}" true } } } } - static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, boolean messageListener, Class origin, Object parentOrLinkedSpan) { + // passing messageId = null will verify message.id is not captured, + // passing messageId = "" will verify message.id is captured (but won't verify anything about the value), + // any other value for messageId will verify that message.id is captured and has that same value + static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) { trace.span(index) { - name destinationType + "/" + destinationName + " receive" - if (messageListener) { - kind CONSUMER + name destinationName + " " + operation + kind CONSUMER + if (parentOrLinkedSpan != null) { childOf((SpanData) parentOrLinkedSpan) } else { - kind CLIENT hasNoParent() - hasLink((SpanData) parentOrLinkedSpan) } errored false attributes { - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType - "${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType + "${SemanticAttributes.MESSAGING_OPERATION.key}" operation if (messageId != null) { - "${SemanticAttributes.MESSAGING_MESSAGE_ID.key()}" messageId - } else { - "${SemanticAttributes.MESSAGING_MESSAGE_ID.key()}" String + //In some tests we don't know exact messageId, so we pass "" and verify just the existence of the attribute + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" { it == messageId || messageId == "" } } - if (destinationName == "") { - "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key()}" true + if (destinationName == JMSTracer.TEMP_DESTINATION_NAME) { + "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key}" true } } } diff --git a/instrumentation/jms-1.1/src/jms2Test/groovy/SpringListenerJMS2Test.groovy b/instrumentation/jms-1.1/src/jms2Test/groovy/SpringListenerJMS2Test.groovy index f45c81740548..27cf49eff192 100644 --- a/instrumentation/jms-1.1/src/jms2Test/groovy/SpringListenerJMS2Test.groovy +++ b/instrumentation/jms-1.1/src/jms2Test/groovy/SpringListenerJMS2Test.groovy @@ -9,10 +9,8 @@ import static JMS2Test.producerSpan import io.opentelemetry.instrumentation.test.AgentTestRunner import javax.jms.ConnectionFactory import listener.Config -import org.hornetq.jms.client.HornetQMessageConsumer import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.jms.core.JmsTemplate -import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter class SpringListenerJMS2Test extends AgentTestRunner { def "receiving message in spring listener generates spans"() { @@ -26,10 +24,10 @@ class SpringListenerJMS2Test extends AgentTestRunner { assertTraces(2) { trace(0, 2) { producerSpan(it, 0, "queue", "SpringListenerJMS2") - consumerSpan(it, 1, "queue", "SpringListenerJMS2", null, true, MessagingMessageListenerAdapter, span(0)) + consumerSpan(it, 1, "queue", "SpringListenerJMS2", "", span(0), "process") } trace(1, 1) { - consumerSpan(it, 0, "queue", "SpringListenerJMS2", null, false, HornetQMessageConsumer, traces[0][0]) + consumerSpan(it, 0, "queue", "SpringListenerJMS2", "", null, "receive") } } diff --git a/instrumentation/jms-1.1/src/jms2Test/groovy/SpringTemplateJMS2Test.groovy b/instrumentation/jms-1.1/src/jms2Test/groovy/SpringTemplateJMS2Test.groovy index f45d25678171..d38b41f8dfd8 100644 --- a/instrumentation/jms-1.1/src/jms2Test/groovy/SpringTemplateJMS2Test.groovy +++ b/instrumentation/jms-1.1/src/jms2Test/groovy/SpringTemplateJMS2Test.groovy @@ -23,7 +23,6 @@ import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory import org.hornetq.core.server.HornetQServer import org.hornetq.core.server.HornetQServers -import org.hornetq.jms.client.HornetQMessageConsumer import org.springframework.jms.core.JmsTemplate import spock.lang.Shared @@ -90,7 +89,7 @@ class SpringTemplateJMS2Test extends AgentTestRunner { producerSpan(it, 0, destinationType, destinationName) } trace(1, 1) { - consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), false, HornetQMessageConsumer, traces[0][0]) + consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), null, "receive") } } @@ -123,14 +122,13 @@ class SpringTemplateJMS2Test extends AgentTestRunner { producerSpan(it, 0, destinationType, destinationName) } trace(1, 1) { - consumerSpan(it, 0, destinationType, destinationName, msgId.get(), false, HornetQMessageConsumer, traces[0][0]) + consumerSpan(it, 0, destinationType, destinationName, msgId.get(), null, "receive") } trace(2, 1) { - // receive doesn't propagate the trace, so this is a root - producerSpan(it, 0, "queue", "") + producerSpan(it, 0, "queue", "(temporary)") } trace(3, 1) { - consumerSpan(it, 0, "queue", "", receivedMessage.getJMSMessageID(), false, HornetQMessageConsumer, traces[2][0]) + consumerSpan(it, 0, "queue", "(temporary)", receivedMessage.getJMSMessageID(), null, "receive") } } diff --git a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSDecorator.java b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSDecorator.java deleted file mode 100644 index d46cbf8e8722..000000000000 --- a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSDecorator.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.jms; - -import io.opentelemetry.OpenTelemetry; -import io.opentelemetry.instrumentation.api.decorator.ClientDecorator; -import io.opentelemetry.trace.Span; -import io.opentelemetry.trace.Tracer; -import io.opentelemetry.trace.attributes.SemanticAttributes; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.Queue; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; -import javax.jms.Topic; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class JMSDecorator extends ClientDecorator { - private static final Logger log = LoggerFactory.getLogger(JMSDecorator.class); - - public static final JMSDecorator DECORATE = new JMSDecorator(); - - public static final Tracer TRACER = OpenTelemetry.getTracer("io.opentelemetry.auto.jms-1.1"); - - public String spanNameForConsumer(Message message) { - return toSpanName(message, null, "receive"); - } - - public String spanNameForProducer(Message message, Destination destination) { - return toSpanName(message, destination, "send"); - } - - private static final String TIBCO_TMP_PREFIX = "$TMP$"; - - public static String toSpanName(Message message, Destination destination, String operationName) { - Destination jmsDestination = null; - try { - jmsDestination = message.getJMSDestination(); - } catch (Exception e) { - } - if (jmsDestination == null) { - jmsDestination = destination; - } - return toSpanName(jmsDestination, operationName); - } - - public static String toSpanName(Destination destination, String operationName) { - try { - if (destination instanceof Queue) { - String queueName = ((Queue) destination).getQueueName(); - if (destination instanceof TemporaryQueue || queueName.startsWith(TIBCO_TMP_PREFIX)) { - return "queue/ " + operationName; - } else { - return "queue/" + queueName + " " + operationName; - } - } - if (destination instanceof Topic) { - String topicName = ((Topic) destination).getTopicName(); - if (destination instanceof TemporaryTopic || topicName.startsWith(TIBCO_TMP_PREFIX)) { - return "topic/ " + operationName; - } else { - return "topic/" + topicName + " " + operationName; - } - } - } catch (Exception e) { - } - return "destination"; - } - - public void afterStart(Span span, String spanName, Message message) { - super.afterStart(span); - if (spanName.startsWith("queue/")) { - span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue"); - span.setAttribute( - SemanticAttributes.MESSAGING_DESTINATION, - spanName.replaceFirst("^queue/", "").replaceFirst(" (send|receive)$", "")); - } else if (spanName.startsWith("topic/")) { - span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"); - span.setAttribute( - SemanticAttributes.MESSAGING_DESTINATION, - spanName.replaceFirst("^topic/", "").replaceFirst(" (send|receive)$", "")); - } - if (spanName.startsWith("queue/") || spanName.startsWith("topic/")) { - span.setAttribute(SemanticAttributes.MESSAGING_TEMP_DESTINATION, true); - } - - if (message != null) { - try { - String messageID = message.getJMSMessageID(); - if (messageID != null) { - span.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageID); - } - } catch (Exception e) { - log.debug("Failure getting JMS message id", e); - } - - try { - String correlationID = message.getJMSCorrelationID(); - if (correlationID != null) { - span.setAttribute(SemanticAttributes.MESSAGING_CONVERSATION_ID, correlationID); - } - } catch (Exception e) { - log.debug("Failure getting JMS correlation id", e); - } - } - } -} diff --git a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageConsumerInstrumentation.java b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageConsumerInstrumentation.java index 2b2e57d4de97..77231e286272 100644 --- a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageConsumerInstrumentation.java +++ b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageConsumerInstrumentation.java @@ -5,28 +5,20 @@ package io.opentelemetry.javaagent.instrumentation.jms; -import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; -import static io.opentelemetry.javaagent.instrumentation.jms.JMSDecorator.DECORATE; -import static io.opentelemetry.javaagent.instrumentation.jms.JMSDecorator.TRACER; -import static io.opentelemetry.javaagent.instrumentation.jms.MessageExtractAdapter.GETTER; +import static io.opentelemetry.javaagent.instrumentation.jms.JMSTracer.TRACER; import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed; import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface; -import static io.opentelemetry.trace.Span.Kind.CLIENT; -import static io.opentelemetry.trace.TracingContextUtils.getSpan; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import com.google.auto.service.AutoService; -import io.grpc.Context; import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext; import io.opentelemetry.javaagent.tooling.Instrumenter; import io.opentelemetry.trace.Span; -import io.opentelemetry.trace.SpanContext; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; import javax.jms.Message; import javax.jms.MessageConsumer; import net.bytebuddy.asm.Advice; @@ -55,7 +47,8 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".JMSDecorator", + packageName + ".MessageDestination", + packageName + ".JMSTracer", packageName + ".MessageExtractAdapter", packageName + ".MessageInjectAdapter" }; @@ -75,7 +68,9 @@ public Map, String> transfor @Override public Map contextStore() { - return singletonMap("javax.jms.MessageConsumer", "java.lang.String"); + return singletonMap( + "javax.jms.MessageConsumer", + "io.opentelemetry.javaagent.instrumentation.jms.MessageDestination"); } public static class ConsumerAdvice { @@ -91,33 +86,25 @@ public static void stopSpan( @Advice.Enter long startTime, @Advice.Return Message message, @Advice.Thrown Throwable throwable) { - String spanName; + MessageDestination destination; if (message == null) { - spanName = InstrumentationContext.get(MessageConsumer.class, String.class).get(consumer); - if (spanName == null) { - spanName = "destination"; + destination = + InstrumentationContext.get(MessageConsumer.class, MessageDestination.class) + .get(consumer); + if (destination == null) { + destination = MessageDestination.UNKNOWN; } } else { - spanName = DECORATE.spanNameForConsumer(message); + destination = TRACER.extractDestination(message, null); } - Span.Builder spanBuilder = - TRACER - .spanBuilder(spanName) - .setSpanKind(CLIENT) - .setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTime)); - if (message != null) { - Context context = extract(message, GETTER); - SpanContext spanContext = getSpan(context).getContext(); - if (spanContext.isValid()) { - spanBuilder.addLink(spanContext); - } - } - Span span = spanBuilder.startSpan(); - DECORATE.afterStart(span, spanName, message); - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.end(); + Span span = TRACER.startConsumerSpan(destination, "receive", message, startTime); + + if (throwable != null) { + TRACER.endExceptionally(span, throwable); + } else { + TRACER.end(span); + } } } } diff --git a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageListenerInstrumentation.java b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageListenerInstrumentation.java index 959e5aa82abf..b85aa6fe34e2 100644 --- a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageListenerInstrumentation.java +++ b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageListenerInstrumentation.java @@ -5,21 +5,16 @@ package io.opentelemetry.javaagent.instrumentation.jms; -import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; -import static io.opentelemetry.javaagent.instrumentation.jms.JMSDecorator.DECORATE; -import static io.opentelemetry.javaagent.instrumentation.jms.JMSDecorator.TRACER; -import static io.opentelemetry.javaagent.instrumentation.jms.MessageExtractAdapter.GETTER; +import static io.opentelemetry.javaagent.instrumentation.jms.JMSTracer.TRACER; import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed; import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface; -import static io.opentelemetry.trace.Span.Kind.CONSUMER; -import static io.opentelemetry.trace.TracingContextUtils.currentContextWith; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; -import io.opentelemetry.javaagent.instrumentation.api.SpanWithScope; +import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.tooling.Instrumenter; import io.opentelemetry.trace.Span; import java.util.Map; @@ -50,7 +45,8 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".JMSDecorator", + packageName + ".MessageDestination", + packageName + ".JMSTracer", packageName + ".MessageExtractAdapter", packageName + ".MessageInjectAdapter" }; @@ -66,29 +62,28 @@ public Map, String> transfor public static class MessageListenerAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static SpanWithScope onEnter(@Advice.Argument(0) Message message) { + public static void onEnter( + @Advice.Argument(0) Message message, + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope) { - String spanName = DECORATE.spanNameForConsumer(message); - Span.Builder spanBuilder = TRACER.spanBuilder(spanName).setSpanKind(CONSUMER); - spanBuilder.setParent(extract(message, GETTER)); - - Span span = spanBuilder.startSpan(); - DECORATE.afterStart(span, spanName, message); - - return new SpanWithScope(span, currentContextWith(span)); + MessageDestination destination = TRACER.extractDestination(message, null); + span = TRACER.startConsumerSpan(destination, "process", message, System.currentTimeMillis()); + scope = TRACER.startScope(span); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( - @Advice.Enter SpanWithScope spanWithScope, @Advice.Thrown Throwable throwable) { - if (spanWithScope == null) { - return; + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope, + @Advice.Thrown Throwable throwable) { + scope.close(); + + if (throwable != null) { + TRACER.endExceptionally(span, throwable); + } else { + TRACER.end(span); } - Span span = spanWithScope.getSpan(); - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.end(); - spanWithScope.closeScope(); } } } diff --git a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageProducerInstrumentation.java b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageProducerInstrumentation.java index 92c2587d37f5..c5614a091c29 100644 --- a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageProducerInstrumentation.java +++ b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSMessageProducerInstrumentation.java @@ -5,23 +5,16 @@ package io.opentelemetry.javaagent.instrumentation.jms; -import static io.opentelemetry.context.ContextUtils.withScopedContext; -import static io.opentelemetry.javaagent.instrumentation.jms.JMSDecorator.DECORATE; -import static io.opentelemetry.javaagent.instrumentation.jms.JMSDecorator.TRACER; -import static io.opentelemetry.javaagent.instrumentation.jms.MessageInjectAdapter.SETTER; +import static io.opentelemetry.javaagent.instrumentation.jms.JMSTracer.TRACER; import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed; import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface; -import static io.opentelemetry.trace.Span.Kind.PRODUCER; -import static io.opentelemetry.trace.TracingContextUtils.withSpan; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArgument; import com.google.auto.service.AutoService; -import io.grpc.Context; -import io.opentelemetry.OpenTelemetry; +import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.instrumentation.api.CallDepthThreadLocalMap; -import io.opentelemetry.javaagent.instrumentation.api.SpanWithScope; import io.opentelemetry.javaagent.tooling.Instrumenter; import io.opentelemetry.trace.Span; import java.util.HashMap; @@ -56,7 +49,8 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".JMSDecorator", + packageName + ".MessageDestination", + packageName + ".JMSTracer", packageName + ".MessageExtractAdapter", packageName + ".MessageInjectAdapter" }; @@ -80,11 +74,14 @@ public Map, String> transfor public static class ProducerAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static SpanWithScope onEnter( - @Advice.Argument(0) Message message, @Advice.This MessageProducer producer) { + public static void onEnter( + @Advice.Argument(0) Message message, + @Advice.This MessageProducer producer, + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope) { int callDepth = CallDepthThreadLocalMap.incrementCallDepth(MessageProducer.class); if (callDepth > 0) { - return null; + return; } Destination defaultDestination; @@ -94,67 +91,64 @@ public static SpanWithScope onEnter( defaultDestination = null; } - String spanName = DECORATE.spanNameForProducer(message, defaultDestination); - Span span = TRACER.spanBuilder(spanName).setSpanKind(PRODUCER).startSpan(); - DECORATE.afterStart(span, spanName, message); - - Context context = withSpan(span, Context.current()); - OpenTelemetry.getPropagators().getTextMapPropagator().inject(context, message, SETTER); - - return new SpanWithScope(span, withScopedContext(context)); + MessageDestination messageDestination = + TRACER.extractDestination(message, defaultDestination); + span = TRACER.startProducerSpan(messageDestination, message); + scope = TRACER.startProducerScope(span, message); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( - @Advice.Enter SpanWithScope spanWithScope, @Advice.Thrown Throwable throwable) { - if (spanWithScope == null) { + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope, + @Advice.Thrown Throwable throwable) { + if (scope == null) { return; } + scope.close(); CallDepthThreadLocalMap.reset(MessageProducer.class); - Span span = spanWithScope.getSpan(); - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - - span.end(); - spanWithScope.closeScope(); + if (throwable != null) { + TRACER.endExceptionally(span, throwable); + } else { + TRACER.end(span); + } } } public static class ProducerWithDestinationAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static SpanWithScope onEnter( - @Advice.Argument(0) Destination destination, @Advice.Argument(1) Message message) { + public static void onEnter( + @Advice.Argument(0) Destination destination, + @Advice.Argument(1) Message message, + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope) { int callDepth = CallDepthThreadLocalMap.incrementCallDepth(MessageProducer.class); if (callDepth > 0) { - return null; + return; } - String spanName = DECORATE.spanNameForProducer(message, destination); - - Span span = TRACER.spanBuilder(spanName).setSpanKind(PRODUCER).startSpan(); - DECORATE.afterStart(span, spanName, message); - - Context context = withSpan(span, Context.current()); - OpenTelemetry.getPropagators().getTextMapPropagator().inject(context, message, SETTER); - - return new SpanWithScope(span, withScopedContext(context)); + MessageDestination messageDestination = TRACER.extractDestination(message, destination); + span = TRACER.startProducerSpan(messageDestination, message); + scope = TRACER.startScope(span); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( - @Advice.Enter SpanWithScope spanWithScope, @Advice.Thrown Throwable throwable) { - if (spanWithScope == null) { + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope, + @Advice.Thrown Throwable throwable) { + if (scope == null) { return; } CallDepthThreadLocalMap.reset(MessageProducer.class); - Span span = spanWithScope.getSpan(); - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.end(); - spanWithScope.closeScope(); + if (throwable != null) { + TRACER.endExceptionally(span, throwable); + } else { + TRACER.end(span); + } } } } diff --git a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSSessionInstrumentation.java b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSSessionInstrumentation.java index 8d64e5267750..29adaa8879f3 100644 --- a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSSessionInstrumentation.java +++ b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSSessionInstrumentation.java @@ -43,7 +43,7 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { - return new String[] {packageName + ".JMSDecorator"}; + return new String[] {packageName + ".MessageDestination", packageName + ".JMSTracer"}; } @Override @@ -57,7 +57,9 @@ public Map, String> transfor @Override public Map contextStore() { - return singletonMap("javax.jms.MessageConsumer", "java.lang.String"); + return singletonMap( + "javax.jms.MessageConsumer", + "io.opentelemetry.javaagent.instrumentation.jms.MessageDestination"); } public static class ConsumerAdvice { @@ -65,8 +67,9 @@ public static class ConsumerAdvice { @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void onExit( @Advice.Argument(0) Destination destination, @Advice.Return MessageConsumer consumer) { - String spanName = JMSDecorator.toSpanName(destination, "receive"); - InstrumentationContext.get(MessageConsumer.class, String.class).put(consumer, spanName); + MessageDestination messageDestination = JMSTracer.extractMessageDestination(destination); + InstrumentationContext.get(MessageConsumer.class, MessageDestination.class) + .put(consumer, messageDestination); } } } diff --git a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSTracer.java b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSTracer.java new file mode 100644 index 000000000000..a3bc000ea7cf --- /dev/null +++ b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/JMSTracer.java @@ -0,0 +1,168 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms; + +import static io.opentelemetry.context.ContextUtils.withScopedContext; +import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; +import static io.opentelemetry.javaagent.instrumentation.jms.MessageExtractAdapter.GETTER; +import static io.opentelemetry.javaagent.instrumentation.jms.MessageInjectAdapter.SETTER; +import static io.opentelemetry.trace.Span.Kind.CONSUMER; +import static io.opentelemetry.trace.Span.Kind.PRODUCER; +import static io.opentelemetry.trace.TracingContextUtils.getSpan; +import static io.opentelemetry.trace.TracingContextUtils.withSpan; + +import io.grpc.Context; +import io.opentelemetry.OpenTelemetry; +import io.opentelemetry.context.Scope; +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.SpanContext; +import io.opentelemetry.trace.attributes.SemanticAttributes; +import java.util.concurrent.TimeUnit; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JMSTracer extends BaseTracer { + private static final Logger log = LoggerFactory.getLogger(JMSTracer.class); + + // From the spec + public static final String TEMP_DESTINATION_NAME = "(temporary)"; + + public static final JMSTracer TRACER = new JMSTracer(); + + public Span startConsumerSpan( + MessageDestination destination, String operation, Message message, long startTime) { + Span.Builder spanBuilder = + tracer + .spanBuilder(spanName(destination, operation)) + .setSpanKind(CONSUMER) + .setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTime)) + .setAttribute(SemanticAttributes.MESSAGING_OPERATION, operation); + + if (message != null && "process".equals(operation)) { + Context context = extract(message, GETTER); + SpanContext spanContext = getSpan(context).getContext(); + if (spanContext.isValid()) { + spanBuilder.setParent(context); + } + } + + Span span = spanBuilder.startSpan(); + afterStart(span, destination, message); + return span; + } + + public Span startProducerSpan(MessageDestination destination, Message message) { + Span span = tracer.spanBuilder(spanName(destination, "send")).setSpanKind(PRODUCER).startSpan(); + afterStart(span, destination, message); + return span; + } + + public Scope startProducerScope(Span span, Message message) { + Context context = withSpan(span, Context.current()); + OpenTelemetry.getPropagators().getTextMapPropagator().inject(context, message, SETTER); + return withScopedContext(context); + } + + public String spanName(MessageDestination destination, String operation) { + if (destination.temporary) { + return TEMP_DESTINATION_NAME + " " + operation; + } else { + return destination.destinationName + " " + operation; + } + } + + private static final String TIBCO_TMP_PREFIX = "$TMP$"; + + public MessageDestination extractDestination(Message message, Destination fallbackDestination) { + Destination jmsDestination = null; + try { + jmsDestination = message.getJMSDestination(); + } catch (Exception ignored) { + } + + if (jmsDestination == null) { + jmsDestination = fallbackDestination; + } + + return extractMessageDestination(jmsDestination); + } + + public static MessageDestination extractMessageDestination(Destination destination) { + if (destination instanceof Queue) { + String queueName; + try { + queueName = ((Queue) destination).getQueueName(); + } catch (JMSException e) { + queueName = "unknown"; + } + + boolean temporary = + destination instanceof TemporaryQueue || queueName.startsWith(TIBCO_TMP_PREFIX); + + return new MessageDestination(queueName, "queue", temporary); + } + + if (destination instanceof Topic) { + String topicName; + try { + topicName = ((Topic) destination).getTopicName(); + } catch (JMSException e) { + topicName = "unknown"; + } + + boolean temporary = + destination instanceof TemporaryTopic || topicName.startsWith(TIBCO_TMP_PREFIX); + + return new MessageDestination(topicName, "topic", temporary); + } + + return MessageDestination.UNKNOWN; + } + + private void afterStart(Span span, MessageDestination destination, Message message) { + span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "jms"); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, destination.destinationKind); + if (destination.temporary) { + span.setAttribute(SemanticAttributes.MESSAGING_TEMP_DESTINATION, true); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, TEMP_DESTINATION_NAME); + } else { + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, destination.destinationName); + } + + if (message != null) { + try { + String messageID = message.getJMSMessageID(); + if (messageID != null) { + span.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, messageID); + } + } catch (Exception e) { + log.debug("Failure getting JMS message id", e); + } + + try { + String correlationID = message.getJMSCorrelationID(); + if (correlationID != null) { + span.setAttribute(SemanticAttributes.MESSAGING_CONVERSATION_ID, correlationID); + } + } catch (Exception e) { + log.debug("Failure getting JMS correlation id", e); + } + } + } + + @Override + protected String getInstrumentationName() { + return "io.opentelemetry.auto.jms-1.1"; + } +} diff --git a/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/MessageDestination.java b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/MessageDestination.java new file mode 100644 index 000000000000..662913e85d06 --- /dev/null +++ b/instrumentation/jms-1.1/src/main/java/io/opentelemetry/javaagent/instrumentation/jms/MessageDestination.java @@ -0,0 +1,21 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.jms; + +public class MessageDestination { + public static final MessageDestination UNKNOWN = + new MessageDestination("unknown", "unknown", false); + + public final String destinationName; + public final String destinationKind; + public final boolean temporary; + + public MessageDestination(String destinationName, String destinationKind, boolean temporary) { + this.destinationName = destinationName; + this.destinationKind = destinationKind; + this.temporary = temporary; + } +} diff --git a/instrumentation/jms-1.1/src/test/groovy/JMS1Test.groovy b/instrumentation/jms-1.1/src/test/groovy/JMS1Test.groovy index 58194a46ecc7..3d066d819f52 100644 --- a/instrumentation/jms-1.1/src/test/groovy/JMS1Test.groovy +++ b/instrumentation/jms-1.1/src/test/groovy/JMS1Test.groovy @@ -3,23 +3,12 @@ * SPDX-License-Identifier: Apache-2.0 */ -import static io.opentelemetry.trace.Span.Kind.CLIENT -import static io.opentelemetry.trace.Span.Kind.CONSUMER -import static io.opentelemetry.trace.Span.Kind.PRODUCER - import io.opentelemetry.instrumentation.test.AgentTestRunner import io.opentelemetry.instrumentation.test.asserts.TraceAssert +import io.opentelemetry.javaagent.instrumentation.jms.JMSTracer import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.trace.attributes.SemanticAttributes -import java.util.concurrent.CountDownLatch -import java.util.concurrent.atomic.AtomicReference -import javax.jms.Connection -import javax.jms.Message -import javax.jms.MessageListener -import javax.jms.Session -import javax.jms.TextMessage import org.apache.activemq.ActiveMQConnectionFactory -import org.apache.activemq.ActiveMQMessageConsumer import org.apache.activemq.command.ActiveMQTextMessage import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -28,7 +17,18 @@ import org.testcontainers.containers.output.Slf4jLogConsumer import spock.lang.Requires import spock.lang.Shared -@Requires({"true" != System.getenv("CIRCLECI")}) +import javax.jms.Connection +import javax.jms.Message +import javax.jms.MessageListener +import javax.jms.Session +import javax.jms.TextMessage +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference + +import static io.opentelemetry.trace.Span.Kind.CONSUMER +import static io.opentelemetry.trace.Span.Kind.PRODUCER + +@Requires({ "true" != System.getenv("CIRCLECI") }) class JMS1Test extends AgentTestRunner { private static final Logger logger = LoggerFactory.getLogger(JMS1Test) @@ -74,7 +74,7 @@ class JMS1Test extends AgentTestRunner { producerSpan(it, 0, destinationType, destinationName) } trace(1, 1) { - consumerSpan(it, 0, destinationType, destinationName, messageId, false, ActiveMQMessageConsumer, traces[0][0]) + consumerSpan(it, 0, destinationType, destinationName, messageId, null, "receive") } } @@ -86,8 +86,8 @@ class JMS1Test extends AgentTestRunner { destination | destinationType | destinationName session.createQueue("someQueue") | "queue" | "someQueue" session.createTopic("someTopic") | "topic" | "someTopic" - session.createTemporaryQueue() | "queue" | "" - session.createTemporaryTopic() | "topic" | "" + session.createTemporaryQueue() | "queue" | JMSTracer.TEMP_DESTINATION_NAME + session.createTemporaryTopic() | "topic" | JMSTracer.TEMP_DESTINATION_NAME } def "sending to a MessageListener on #destinationName #destinationType generates a span"() { @@ -111,7 +111,7 @@ class JMS1Test extends AgentTestRunner { assertTraces(1) { trace(0, 2) { producerSpan(it, 0, destinationType, destinationName) - consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), true, consumer.messageListener.class, span(0)) + consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process") } } // This check needs to go after all traces have been accounted for @@ -125,8 +125,8 @@ class JMS1Test extends AgentTestRunner { destination | destinationType | destinationName session.createQueue("someQueue") | "queue" | "someQueue" session.createTopic("someTopic") | "topic" | "someTopic" - session.createTemporaryQueue() | "queue" | "" - session.createTemporaryTopic() | "topic" | "" + session.createTemporaryQueue() | "queue" | JMSTracer.TEMP_DESTINATION_NAME + session.createTemporaryTopic() | "topic" | JMSTracer.TEMP_DESTINATION_NAME } def "failing to receive message with receiveNoWait on #destinationName #destinationType works"() { @@ -142,12 +142,14 @@ class JMS1Test extends AgentTestRunner { trace(0, 1) { // Consumer trace span(0) { hasNoParent() - name destinationType + "/" + destinationName + " receive" - kind CLIENT + name destinationName + " receive" + kind CONSUMER errored false attributes { - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType - "${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" } } } @@ -175,12 +177,14 @@ class JMS1Test extends AgentTestRunner { trace(0, 1) { // Consumer trace span(0) { hasNoParent() - name destinationType + "/" + destinationName + " receive" - kind CLIENT + name destinationName + " receive" + kind CONSUMER errored false attributes { - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType - "${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" } } } @@ -223,15 +227,17 @@ class JMS1Test extends AgentTestRunner { trace(1, 1) { span(0) { hasNoParent() - name destinationType + "/" + destinationName + " receive" - kind CLIENT + name destinationName + " receive" + kind CONSUMER errored false attributes { - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType - "${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName - "${SemanticAttributes.MESSAGING_MESSAGE_ID.key()}" receivedMessage.getJMSMessageID() - if (destinationName == "") { - "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key()}" true + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" receivedMessage.getJMSMessageID() + "${SemanticAttributes.MESSAGING_OPERATION.key}" "receive" + if (destinationName == JMSTracer.TEMP_DESTINATION_NAME) { + "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key}" true } } } @@ -246,48 +252,51 @@ class JMS1Test extends AgentTestRunner { destination | destinationType | destinationName session.createQueue("someQueue") | "queue" | "someQueue" session.createTopic("someTopic") | "topic" | "someTopic" - session.createTemporaryQueue() | "queue" | "" - session.createTemporaryTopic() | "topic" | "" + session.createTemporaryQueue() | "queue" | JMSTracer.TEMP_DESTINATION_NAME + session.createTemporaryTopic() | "topic" | JMSTracer.TEMP_DESTINATION_NAME } static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) { trace.span(index) { - name destinationType + "/" + destinationName + " send" + name destinationName + " send" kind PRODUCER errored false hasNoParent() attributes { - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType - "${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName - if (destinationName == "") { - "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key()}" true + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType + if (destinationName == JMSTracer.TEMP_DESTINATION_NAME) { + "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key}" true } } } } - static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, boolean messageListener, Class origin, Object parentOrLinkedSpan) { + // passing messageId = null will verify message.id is not captured, + // passing messageId = "" will verify message.id is captured (but won't verify anything about the value), + // any other value for messageId will verify that message.id is captured and has that same value + static consumerSpan(TraceAssert trace, int index, String destinationType, String destinationName, String messageId, Object parentOrLinkedSpan, String operation) { trace.span(index) { - name destinationType + "/" + destinationName + " receive" - if (messageListener) { - kind CONSUMER + name destinationName + " " + operation + kind CONSUMER + if (parentOrLinkedSpan != null) { childOf((SpanData) parentOrLinkedSpan) } else { - kind CLIENT hasNoParent() - hasLink((SpanData) parentOrLinkedSpan) } errored false attributes { - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key()}" destinationType - "${SemanticAttributes.MESSAGING_DESTINATION.key()}" destinationName + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "jms" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" destinationName + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" destinationType + "${SemanticAttributes.MESSAGING_OPERATION.key}" operation if (messageId != null) { - "${SemanticAttributes.MESSAGING_MESSAGE_ID.key()}" messageId - } else { - "${SemanticAttributes.MESSAGING_MESSAGE_ID.key()}" String + //In some tests we don't know exact messageId, so we pass "" and verify just the existence of the attribute + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" { it == messageId || messageId == "" } } - if (destinationName == "") { - "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key()}" true + if (destinationName == JMSTracer.TEMP_DESTINATION_NAME) { + "${SemanticAttributes.MESSAGING_TEMP_DESTINATION.key}" true } } } diff --git a/instrumentation/jms-1.1/src/test/groovy/SpringListenerJMS1Test.groovy b/instrumentation/jms-1.1/src/test/groovy/SpringListenerJMS1Test.groovy index 79bd5e54a5cb..b258f9d80f3f 100644 --- a/instrumentation/jms-1.1/src/test/groovy/SpringListenerJMS1Test.groovy +++ b/instrumentation/jms-1.1/src/test/groovy/SpringListenerJMS1Test.groovy @@ -9,13 +9,11 @@ import static JMS1Test.producerSpan import io.opentelemetry.instrumentation.test.AgentTestRunner import javax.jms.ConnectionFactory import listener.Config -import org.apache.activemq.ActiveMQMessageConsumer import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.jms.core.JmsTemplate -import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter import spock.lang.Requires -@Requires({"true" != System.getenv("CIRCLECI")}) +@Requires({ "true" != System.getenv("CIRCLECI") }) class SpringListenerJMS1Test extends AgentTestRunner { def "receiving message in spring listener generates spans"() { @@ -29,10 +27,10 @@ class SpringListenerJMS1Test extends AgentTestRunner { assertTraces(2) { trace(0, 2) { producerSpan(it, 0, "queue", "SpringListenerJMS1") - consumerSpan(it, 1, "queue", "SpringListenerJMS1", null, true, MessagingMessageListenerAdapter, span(0)) + consumerSpan(it, 1, "queue", "SpringListenerJMS1", "", span(0), "process") } trace(1, 1) { - consumerSpan(it, 0, "queue", "SpringListenerJMS1", null, false, ActiveMQMessageConsumer, traces[0][0]) + consumerSpan(it, 0, "queue", "SpringListenerJMS1", "", null, "receive") } } diff --git a/instrumentation/jms-1.1/src/test/groovy/SpringTemplateJMS1Test.groovy b/instrumentation/jms-1.1/src/test/groovy/SpringTemplateJMS1Test.groovy index 52179f1d9943..950ec219a8d5 100644 --- a/instrumentation/jms-1.1/src/test/groovy/SpringTemplateJMS1Test.groovy +++ b/instrumentation/jms-1.1/src/test/groovy/SpringTemplateJMS1Test.groovy @@ -3,18 +3,10 @@ * SPDX-License-Identifier: Apache-2.0 */ -import static JMS1Test.consumerSpan -import static JMS1Test.producerSpan - import com.google.common.base.Stopwatch import io.opentelemetry.instrumentation.test.AgentTestRunner -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicReference -import javax.jms.Connection -import javax.jms.Session -import javax.jms.TextMessage +import io.opentelemetry.javaagent.instrumentation.jms.JMSTracer import org.apache.activemq.ActiveMQConnectionFactory -import org.apache.activemq.ActiveMQMessageConsumer import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.jms.core.JmsTemplate @@ -23,7 +15,16 @@ import org.testcontainers.containers.output.Slf4jLogConsumer import spock.lang.Requires import spock.lang.Shared -@Requires({"true" != System.getenv("CIRCLECI")}) +import javax.jms.Connection +import javax.jms.Session +import javax.jms.TextMessage +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference + +import static JMS1Test.consumerSpan +import static JMS1Test.producerSpan + +@Requires({ "true" != System.getenv("CIRCLECI") }) class SpringTemplateJMS1Test extends AgentTestRunner { private static final Logger logger = LoggerFactory.getLogger(SpringTemplateJMS1Test) @@ -67,7 +68,7 @@ class SpringTemplateJMS1Test extends AgentTestRunner { producerSpan(it, 0, destinationType, destinationName) } trace(1, 1) { - consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), false, ActiveMQMessageConsumer, traces[0][0]) + consumerSpan(it, 0, destinationType, destinationName, receivedMessage.getJMSMessageID(), null, "receive") } } @@ -104,14 +105,14 @@ class SpringTemplateJMS1Test extends AgentTestRunner { producerSpan(it, 0, destinationType, destinationName) } trace(1, 1) { - consumerSpan(it, 0, destinationType, destinationName, msgId.get(), false, ActiveMQMessageConsumer, traces[0][0]) + consumerSpan(it, 0, destinationType, destinationName, msgId.get(), null, "receive") } trace(2, 1) { // receive doesn't propagate the trace, so this is a root - producerSpan(it, 0, "queue", "") + producerSpan(it, 0, "queue", JMSTracer.TEMP_DESTINATION_NAME) } trace(3, 1) { - consumerSpan(it, 0, "queue", "", receivedMessage.getJMSMessageID(), false, ActiveMQMessageConsumer, traces[2][0]) + consumerSpan(it, 0, "queue", JMSTracer.TEMP_DESTINATION_NAME, receivedMessage.getJMSMessageID(), null, "receive") } } diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java index ccea2d9790cd..57c99a7ee08a 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerInstrumentation.java @@ -5,7 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaDecorator.DECORATE; +import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaConsumerTracer.TRACER; import static net.bytebuddy.matcher.ElementMatchers.isMethod; import static net.bytebuddy.matcher.ElementMatchers.isPublic; import static net.bytebuddy.matcher.ElementMatchers.named; @@ -41,7 +41,7 @@ public ElementMatcher typeMatcher() { public String[] helperClassNames() { return new String[] { packageName + ".KafkaClientConfiguration", - packageName + ".KafkaDecorator", + packageName + ".KafkaConsumerTracer", packageName + ".TextMapExtractAdapter", packageName + ".TracingIterable", packageName + ".TracingIterator", @@ -79,9 +79,10 @@ public Map, String> transfor public static class IterableAdvice { @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap(@Advice.Return(readOnly = false) Iterable iterable) { + public static void wrap( + @Advice.Return(readOnly = false) Iterable> iterable) { if (iterable != null) { - iterable = new TracingIterable(iterable, DECORATE); + iterable = new TracingIterable(iterable, TRACER); } } } @@ -89,9 +90,9 @@ public static void wrap(@Advice.Return(readOnly = false) Iterable iterable) { + public static void wrap(@Advice.Return(readOnly = false) List> iterable) { if (iterable != null) { - iterable = new TracingList(iterable, DECORATE); + iterable = new TracingList(iterable, TRACER); } } } @@ -99,9 +100,10 @@ public static void wrap(@Advice.Return(readOnly = false) List it public static class IteratorAdvice { @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap(@Advice.Return(readOnly = false) Iterator iterator) { + public static void wrap( + @Advice.Return(readOnly = false) Iterator> iterator) { if (iterator != null) { - iterator = new TracingIterator(iterator, DECORATE); + iterator = new TracingIterator(iterator, TRACER); } } } diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerTracer.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerTracer.java new file mode 100644 index 000000000000..54ffb782fa28 --- /dev/null +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaConsumerTracer.java @@ -0,0 +1,79 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkaclients; + +import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; +import static io.opentelemetry.javaagent.instrumentation.kafkaclients.TextMapExtractAdapter.GETTER; +import static io.opentelemetry.trace.Span.Kind.CONSUMER; + +import io.grpc.Context; +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.attributes.SemanticAttributes; +import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; + +public class KafkaConsumerTracer extends BaseTracer { + public static final KafkaConsumerTracer TRACER = new KafkaConsumerTracer(); + + public Span startSpan(ConsumerRecord record) { + long now = System.currentTimeMillis(); + + Span span = + tracer + .spanBuilder(spanNameOnConsume(record)) + .setSpanKind(CONSUMER) + .setParent(extractParent(record)) + .setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(now)) + .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka") + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic()) + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic") + .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .setAttribute( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + (long) record.serializedValueSize()) + .startSpan(); + + onConsume(span, now, record); + return span; + } + + private Context extractParent(ConsumerRecord record) { + if (KafkaClientConfiguration.isPropagationEnabled()) { + return extract(record.headers(), GETTER); + } else { + return Context.current(); + } + } + + public String spanNameOnConsume(ConsumerRecord record) { + return record.topic() + " process"; + } + + public void onConsume(Span span, long startTimeMillis, ConsumerRecord record) { + // TODO should we set topic + offset as messaging.message_id? + span.setAttribute("partition", record.partition()); + span.setAttribute("offset", record.offset()); + + if (record.value() == null) { + span.setAttribute("tombstone", true); + } + + // don't record a duration if the message was sent from an old Kafka client + if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { + long produceTime = record.timestamp(); + // this attribute shows how much time elapsed between the producer and the consumer of this + // message, which can be helpful for identifying queue bottlenecks + span.setAttribute("record.queue_time_ms", Math.max(0L, startTimeMillis - produceTime)); + } + } + + @Override + protected String getInstrumentationName() { + return "io.opentelemetry.auto.kafka-clients-0.11"; + } +} diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaDecorator.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaDecorator.java deleted file mode 100644 index ac2738d0ca9f..000000000000 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaDecorator.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkaclients; - -import io.opentelemetry.OpenTelemetry; -import io.opentelemetry.instrumentation.api.decorator.ClientDecorator; -import io.opentelemetry.trace.Span; -import io.opentelemetry.trace.Tracer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.record.TimestampType; - -public class KafkaDecorator extends ClientDecorator { - public static final KafkaDecorator DECORATE = new KafkaDecorator(); - - public static final Tracer TRACER = - OpenTelemetry.getTracer("io.opentelemetry.auto.kafka-clients-0.11"); - - public String spanNameOnConsume(ConsumerRecord record) { - String topic = record.topic(); - if (topic != null) { - return topic; - } else { - return "destination"; - } - } - - public String spanNameOnProduce(ProducerRecord record) { - if (record != null) { - String topic = record.topic(); - if (topic != null) { - return topic; - } - } - return "destination"; - } - - public void onConsume(Span span, long startTimeMillis, ConsumerRecord record) { - span.setAttribute("partition", record.partition()); - span.setAttribute("offset", record.offset()); - // don't record a duration if the message was sent from an old Kafka client - if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { - long produceTime = record.timestamp(); - // this attribute shows how much time elapsed between the producer and the consumer of this - // message, which can be helpful for identifying queue bottlenecks - span.setAttribute("record.queue_time_ms", Math.max(0L, startTimeMillis - produceTime)); - } - } - - public void onProduce(Span span, ProducerRecord record) { - if (record != null) { - Integer partition = record.partition(); - if (partition != null) { - span.setAttribute("partition", partition); - } - } - } -} diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java index fafb63e364be..f4a98e9d87a3 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerInstrumentation.java @@ -6,11 +6,8 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; import static io.opentelemetry.context.ContextUtils.withScopedContext; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaDecorator.DECORATE; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaDecorator.TRACER; +import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaProducerTracer.TRACER; import static io.opentelemetry.javaagent.instrumentation.kafkaclients.TextMapInjectAdapter.SETTER; -import static io.opentelemetry.trace.Span.Kind.PRODUCER; -import static io.opentelemetry.trace.TracingContextUtils.currentContextWith; import static io.opentelemetry.trace.TracingContextUtils.withSpan; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -22,7 +19,6 @@ import io.grpc.Context; import io.opentelemetry.OpenTelemetry; import io.opentelemetry.context.Scope; -import io.opentelemetry.javaagent.instrumentation.api.SpanWithScope; import io.opentelemetry.javaagent.tooling.Instrumenter; import io.opentelemetry.trace.Span; import java.util.Map; @@ -34,7 +30,6 @@ import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.record.RecordBatch; @AutoService(Instrumenter.class) public final class KafkaProducerInstrumentation extends Instrumenter.Default { @@ -52,7 +47,7 @@ public ElementMatcher typeMatcher() { public String[] helperClassNames() { return new String[] { packageName + ".KafkaClientConfiguration", - packageName + ".KafkaDecorator", + packageName + ".KafkaProducerTracer", packageName + ".TextMapInjectAdapter", KafkaProducerInstrumentation.class.getName() + "$ProducerCallback" }; @@ -72,36 +67,25 @@ public Map, String> transfor public static class ProducerAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static SpanWithScope onEnter( + public static void onEnter( @Advice.FieldValue("apiVersions") ApiVersions apiVersions, @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, - @Advice.Argument(value = 1, readOnly = false) Callback callback) { + @Advice.Argument(value = 1, readOnly = false) Callback callback, + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope) { + Context parent = Context.current(); - Span span = - TRACER.spanBuilder(DECORATE.spanNameOnProduce(record)).setSpanKind(PRODUCER).startSpan(); - DECORATE.afterStart(span); - DECORATE.onProduce(span, record); - callback = new ProducerCallback(callback, parent, span); + span = TRACER.startProducerSpan(record); + Context newContext = withSpan(span, parent); - if (record.value() == null) { - span.setAttribute("tombstone", true); - } + callback = new ProducerCallback(callback, parent, span); - // Do not inject headers for batch versions below 2 - // This is how similar check is being done in Kafka client itself: - // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 - // Also, do not inject headers if specified by JVM option or environment variable - // This can help in mixed client environments where clients < 0.11 that do not support - // headers attempt to read messages that were produced by clients > 0.11 and the magic - // value of the broker(s) is >= 2 - if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 - && KafkaClientConfiguration.isPropagationEnabled()) { - Context context = withSpan(span, Context.current()); + if (TRACER.shouldPropagate(apiVersions)) { try { OpenTelemetry.getPropagators() .getTextMapPropagator() - .inject(context, record.headers(), SETTER); + .inject(newContext, record.headers(), SETTER); } catch (IllegalStateException e) { // headers must be read-only from reused record. try again with new one. record = @@ -115,20 +99,24 @@ record = OpenTelemetry.getPropagators() .getTextMapPropagator() - .inject(context, record.headers(), SETTER); + .inject(newContext, record.headers(), SETTER); } } - return new SpanWithScope(span, currentContextWith(span)); + scope = withScopedContext(newContext); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( - @Advice.Enter SpanWithScope spanWithScope, @Advice.Thrown Throwable throwable) { - Span span = spanWithScope.getSpan(); - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - spanWithScope.closeScope(); + @Advice.Thrown Throwable throwable, + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope) { + + scope.close(); + + if (throwable != null) { + TRACER.endExceptionally(span, throwable); + } // span finished by ProducerCallback } } @@ -146,12 +134,15 @@ public ProducerCallback(Callback callback, Context parent, Span span) { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - DECORATE.onError(span, exception); - DECORATE.beforeFinish(span); - span.end(); + if (exception != null) { + TRACER.endExceptionally(span, exception); + } else { + TRACER.end(span); + } + if (callback != null) { if (parent != null) { - try (Scope scope = withScopedContext(parent)) { + try (Scope ignored = withScopedContext(parent)) { callback.onCompletion(metadata, exception); } } else { diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerTracer.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerTracer.java new file mode 100644 index 000000000000..00c9235e94f4 --- /dev/null +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaProducerTracer.java @@ -0,0 +1,60 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkaclients; + +import static io.opentelemetry.trace.Span.Kind.PRODUCER; + +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.attributes.SemanticAttributes; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.RecordBatch; + +public class KafkaProducerTracer extends BaseTracer { + public static final KafkaProducerTracer TRACER = new KafkaProducerTracer(); + + public Span startProducerSpan(ProducerRecord record) { + Span span = startSpan(spanNameOnProduce(record), PRODUCER); + onProduce(span, record); + return span; + } + + // Do not inject headers for batch versions below 2 + // This is how similar check is being done in Kafka client itself: + // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 + // Also, do not inject headers if specified by JVM option or environment variable + // This can help in mixed client environments where clients < 0.11 that do not support + // headers attempt to read messages that were produced by clients > 0.11 and the magic + // value of the broker(s) is >= 2 + public boolean shouldPropagate(ApiVersions apiVersions) { + return apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 + && KafkaClientConfiguration.isPropagationEnabled(); + } + + public String spanNameOnProduce(ProducerRecord record) { + return record.topic() + " send"; + } + + public void onProduce(Span span, ProducerRecord record) { + span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka"); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic()); + + Integer partition = record.partition(); + if (partition != null) { + span.setAttribute("partition", partition); + } + if (record.value() == null) { + span.setAttribute("tombstone", true); + } + } + + @Override + protected String getInstrumentationName() { + return "io.opentelemetry.auto.kafka-clients-0.11"; + } +} diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java index 4e6d0f209a02..e181af7d6ce6 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterable.java @@ -8,24 +8,24 @@ import java.util.Iterator; import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingIterable implements Iterable { - private final Iterable delegate; - private final KafkaDecorator decorator; +public class TracingIterable implements Iterable> { + private final Iterable> delegate; + private final KafkaConsumerTracer tracer; private boolean firstIterator = true; - public TracingIterable(Iterable delegate, KafkaDecorator decorator) { + public TracingIterable(Iterable> delegate, KafkaConsumerTracer tracer) { this.delegate = delegate; - this.decorator = decorator; + this.tracer = tracer; } @Override - public Iterator iterator() { - Iterator it; + public Iterator> iterator() { + Iterator> it; // We should only return one iterator with tracing. // However, this is not thread-safe, but usually the first (hopefully only) traversal of // ConsumerRecords is performed in the same thread that called poll() if (firstIterator) { - it = new TracingIterator(delegate.iterator(), decorator); + it = new TracingIterator(delegate.iterator(), tracer); firstIterator = false; } else { it = delegate.iterator(); diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java index 96e3b51ea4b6..53949b8c92da 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingIterator.java @@ -5,29 +5,21 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; -import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaDecorator.TRACER; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.TextMapExtractAdapter.GETTER; -import static io.opentelemetry.trace.Span.Kind.CONSUMER; import static io.opentelemetry.trace.TracingContextUtils.currentContextWith; -import static io.opentelemetry.trace.TracingContextUtils.getSpan; -import io.grpc.Context; import io.opentelemetry.javaagent.instrumentation.api.SpanWithScope; import io.opentelemetry.trace.Span; -import io.opentelemetry.trace.SpanContext; import java.util.Iterator; -import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TracingIterator implements Iterator { +public class TracingIterator implements Iterator> { private static final Logger log = LoggerFactory.getLogger(TracingIterator.class); - private final Iterator delegateIterator; - private final KafkaDecorator decorator; + private final Iterator> delegateIterator; + private final KafkaConsumerTracer tracer; private final boolean propagationEnabled; /** @@ -36,16 +28,17 @@ public class TracingIterator implements Iterator { */ private SpanWithScope currentSpanWithScope; - public TracingIterator(Iterator delegateIterator, KafkaDecorator decorator) { + public TracingIterator( + Iterator> delegateIterator, KafkaConsumerTracer tracer) { this.delegateIterator = delegateIterator; - this.decorator = decorator; + this.tracer = tracer; this.propagationEnabled = KafkaClientConfiguration.isPropagationEnabled(); } @Override public boolean hasNext() { if (currentSpanWithScope != null) { - currentSpanWithScope.getSpan().end(); + tracer.end(currentSpanWithScope.getSpan()); currentSpanWithScope.closeScope(); currentSpanWithScope = null; } @@ -53,42 +46,20 @@ public boolean hasNext() { } @Override - public ConsumerRecord next() { + public ConsumerRecord next() { if (currentSpanWithScope != null) { // in case they didn't call hasNext()... - currentSpanWithScope.getSpan().end(); + tracer.end(currentSpanWithScope.getSpan()); currentSpanWithScope.closeScope(); currentSpanWithScope = null; } - ConsumerRecord next = delegateIterator.next(); + ConsumerRecord next = delegateIterator.next(); try { if (next != null) { - boolean consumer = !TRACER.getCurrentSpan().getContext().isValid(); - Span.Builder spanBuilder = TRACER.spanBuilder(decorator.spanNameOnConsume(next)); - if (consumer) { - spanBuilder.setSpanKind(CONSUMER); - } - if (propagationEnabled) { - Context context = extract(next.headers(), GETTER); - SpanContext spanContext = getSpan(context).getContext(); - if (spanContext.isValid()) { - if (consumer) { - spanBuilder.setParent(context); - } else { - spanBuilder.addLink(spanContext); - } - } - } - long startTimeMillis = System.currentTimeMillis(); - spanBuilder.setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTimeMillis)); - Span span = spanBuilder.startSpan(); - if (next.value() == null) { - span.setAttribute("tombstone", true); - } - decorator.afterStart(span); - decorator.onConsume(span, startTimeMillis, next); + Span span = tracer.startSpan(next); + currentSpanWithScope = new SpanWithScope(span, currentContextWith(span)); } } catch (Exception e) { diff --git a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java index 5000f8a4a8c9..afbd01447995 100644 --- a/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java +++ b/instrumentation/kafka-clients-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/TracingList.java @@ -10,10 +10,10 @@ import java.util.ListIterator; import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingList extends TracingIterable implements List { - private final List delegate; +public class TracingList extends TracingIterable implements List> { + private final List> delegate; - public TracingList(List delegate, KafkaDecorator decorator) { + public TracingList(List> delegate, KafkaConsumerTracer decorator) { super(delegate, decorator); this.delegate = delegate; } @@ -59,12 +59,12 @@ public boolean containsAll(Collection c) { } @Override - public boolean addAll(Collection c) { + public boolean addAll(Collection> c) { return delegate.addAll(c); } @Override - public boolean addAll(int index, Collection c) { + public boolean addAll(int index, Collection> c) { return delegate.addAll(index, c); } @@ -84,13 +84,13 @@ public void clear() { } @Override - public ConsumerRecord get(int index) { + public ConsumerRecord get(int index) { // TODO: should this be instrumented as well? return delegate.get(index); } @Override - public ConsumerRecord set(int index, ConsumerRecord element) { + public ConsumerRecord set(int index, ConsumerRecord element) { return delegate.set(index, element); } @@ -100,7 +100,7 @@ public void add(int index, ConsumerRecord element) { } @Override - public ConsumerRecord remove(int index) { + public ConsumerRecord remove(int index) { return delegate.remove(index); } @@ -115,21 +115,21 @@ public int lastIndexOf(Object o) { } @Override - public ListIterator listIterator() { + public ListIterator> listIterator() { // TODO: the API for ListIterator is not really good to instrument it in context of Kafka // Consumer so we will not do that for now return delegate.listIterator(); } @Override - public ListIterator listIterator(int index) { + public ListIterator> listIterator(int index) { // TODO: the API for ListIterator is not really good to instrument it in context of Kafka // Consumer so we will not do that for now return delegate.listIterator(index); } @Override - public List subList(int fromIndex, int toIndex) { + public List> subList(int fromIndex, int toIndex) { // TODO: the API for subList is not really good to instrument it in context of Kafka // Consumer so we will not do that for now // Kafka is essentially a sequential commit log. We should only enable tracing when traversing diff --git a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index 7d4ba983fb5d..2934e46b4a06 100644 --- a/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -3,17 +3,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -import static io.opentelemetry.instrumentation.test.utils.ConfigUtils.setConfig -import static io.opentelemetry.instrumentation.test.utils.ConfigUtils.updateConfig -import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan -import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace -import static io.opentelemetry.trace.Span.Kind.CONSUMER -import static io.opentelemetry.trace.Span.Kind.PRODUCER - -import io.opentelemetry.instrumentation.test.AgentTestRunner import io.opentelemetry.instrumentation.api.config.Config -import java.util.concurrent.LinkedBlockingQueue -import java.util.concurrent.TimeUnit +import io.opentelemetry.instrumentation.test.AgentTestRunner +import io.opentelemetry.trace.attributes.SemanticAttributes import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer @@ -33,6 +25,16 @@ import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils import spock.lang.Unroll +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit + +import static io.opentelemetry.instrumentation.test.utils.ConfigUtils.setConfig +import static io.opentelemetry.instrumentation.test.utils.ConfigUtils.updateConfig +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace +import static io.opentelemetry.trace.Span.Kind.CONSUMER +import static io.opentelemetry.trace.Span.Kind.PRODUCER + class KafkaClientTest extends AgentTestRunner { static final SHARED_TOPIC = "shared.topic" @@ -96,19 +98,27 @@ class KafkaClientTest extends AgentTestRunner { trace(0, 4) { basicSpan(it, 0, "parent") span(1) { - name SHARED_TOPIC + name SHARED_TOPIC + " send" kind PRODUCER errored false childOf span(0) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } span(2) { - name SHARED_TOPIC + name SHARED_TOPIC + " process" kind CONSUMER errored false childOf span(1) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "partition" { it >= 0 } "offset" 0 "record.queue_time_ms" { it >= 0 } @@ -178,19 +188,27 @@ class KafkaClientTest extends AgentTestRunner { trace(0, 4) { basicSpan(it, 0, "parent") span(1) { - name SHARED_TOPIC + name SHARED_TOPIC + " send" kind PRODUCER errored false childOf span(0) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } span(2) { - name SHARED_TOPIC + name SHARED_TOPIC + " process" kind CONSUMER errored false childOf span(1) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "partition" { it >= 0 } "offset" 0 "record.queue_time_ms" { it >= 0 } @@ -253,21 +271,29 @@ class KafkaClientTest extends AgentTestRunner { trace(0, 2) { // PRODUCER span 0 span(0) { - name SHARED_TOPIC + name SHARED_TOPIC + " send" kind PRODUCER errored false hasNoParent() attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "tombstone" true } } // CONSUMER span 0 span(1) { - name SHARED_TOPIC + name SHARED_TOPIC + " process" kind CONSUMER errored false childOf span(0) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "partition" { it >= 0 } "offset" 0 "record.queue_time_ms" { it >= 0 } @@ -320,20 +346,28 @@ class KafkaClientTest extends AgentTestRunner { assertTraces(1) { trace(0, 2) { span(0) { - name SHARED_TOPIC + name SHARED_TOPIC + " send" kind PRODUCER errored false hasNoParent() attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "partition" { it >= 0 } } } span(1) { - name SHARED_TOPIC + name SHARED_TOPIC + " process" kind CONSUMER errored false childOf span(0) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "partition" { it >= 0 } "offset" 0 "record.queue_time_ms" { it >= 0 } @@ -417,11 +451,14 @@ class KafkaClientTest extends AgentTestRunner { assertTraces(1) { trace(0, 1) { span(0) { - name SHARED_TOPIC + name SHARED_TOPIC + " send" kind PRODUCER errored false hasNoParent() attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } } @@ -439,19 +476,27 @@ class KafkaClientTest extends AgentTestRunner { assertTraces(1) { trace(0, 2) { span(0) { - name SHARED_TOPIC + name SHARED_TOPIC + " send" kind PRODUCER errored false hasNoParent() attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } span(1) { - name SHARED_TOPIC + name SHARED_TOPIC + " process" kind CONSUMER errored false childOf span(0) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "partition" { it >= 0 } "offset" 0 "record.queue_time_ms" { it >= 0 } @@ -473,19 +518,27 @@ class KafkaClientTest extends AgentTestRunner { assertTraces(2) { trace(0, 2) { span(0) { - name SHARED_TOPIC + name SHARED_TOPIC + " send" kind PRODUCER errored false hasNoParent() attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } span(1) { - name SHARED_TOPIC + name SHARED_TOPIC + " process" kind CONSUMER errored false childOf span(0) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "partition" { it >= 0 } "offset" 0 "record.queue_time_ms" { it >= 0 } @@ -494,11 +547,16 @@ class KafkaClientTest extends AgentTestRunner { } trace(1, 1) { span(0) { - name SHARED_TOPIC + name SHARED_TOPIC + " process" kind CONSUMER errored false hasNoParent() attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "partition" { it >= 0 } "offset" 0 "record.queue_time_ms" { it >= 0 } diff --git a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDecorator.java b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDecorator.java deleted file mode 100644 index cea637fbd9ed..000000000000 --- a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDecorator.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkastreams; - -import io.opentelemetry.OpenTelemetry; -import io.opentelemetry.instrumentation.api.decorator.ClientDecorator; -import io.opentelemetry.trace.Span; -import io.opentelemetry.trace.Tracer; -import org.apache.kafka.streams.processor.internals.StampedRecord; - -public class KafkaStreamsDecorator extends ClientDecorator { - public static final KafkaStreamsDecorator CONSUMER_DECORATE = new KafkaStreamsDecorator(); - - public static final Tracer TRACER = - OpenTelemetry.getTracer("io.opentelemetry.auto.kafka-streams-0.11"); - - public String spanNameForConsume(StampedRecord record) { - if (record == null) { - return null; - } - String topic = record.topic(); - if (topic != null) { - return topic; - } else { - return "destination"; - } - } - - public void onConsume(Span span, StampedRecord record) { - if (record != null) { - span.setAttribute("partition", record.partition()); - span.setAttribute("offset", record.offset()); - } - } -} diff --git a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsProcessorInstrumentation.java b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsProcessorInstrumentation.java index 3bdabf530461..8ed92721210e 100644 --- a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsProcessorInstrumentation.java +++ b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsProcessorInstrumentation.java @@ -5,12 +5,8 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; -import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsDecorator.CONSUMER_DECORATE; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsDecorator.TRACER; import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsProcessorInstrumentation.SpanScopeHolder.HOLDER; -import static io.opentelemetry.javaagent.instrumentation.kafkastreams.TextMapExtractAdapter.GETTER; -import static io.opentelemetry.trace.Span.Kind.CONSUMER; +import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsTracer.TRACER; import static io.opentelemetry.trace.TracingContextUtils.currentContextWith; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -64,7 +60,7 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".KafkaStreamsDecorator", + packageName + ".KafkaStreamsTracer", packageName + ".TextMapExtractAdapter", KafkaStreamsProcessorInstrumentation.class.getName() + "$SpanScopeHolder" }; @@ -94,12 +90,7 @@ public static void onExit(@Advice.Return StampedRecord record) { return; } - Span.Builder spanBuilder = - TRACER.spanBuilder(CONSUMER_DECORATE.spanNameForConsume(record)).setSpanKind(CONSUMER); - spanBuilder.setParent(extract(record.value.headers(), GETTER)); - Span span = spanBuilder.startSpan(); - CONSUMER_DECORATE.afterStart(span); - CONSUMER_DECORATE.onConsume(span, record); + Span span = TRACER.startSpan(record); holder.setSpanWithScope(new SpanWithScope(span, currentContextWith(span))); } @@ -121,7 +112,7 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".KafkaStreamsDecorator", + packageName + ".KafkaStreamsTracer", packageName + ".TextMapExtractAdapter", KafkaStreamsProcessorInstrumentation.class.getName() + "$SpanScopeHolder" }; @@ -149,11 +140,15 @@ public static void stopSpan( HOLDER.remove(); SpanWithScope spanWithScope = holder.getSpanWithScope(); if (spanWithScope != null) { - Span span = spanWithScope.getSpan(); - CONSUMER_DECORATE.onError(span, throwable); - CONSUMER_DECORATE.beforeFinish(span); - span.end(); spanWithScope.closeScope(); + + Span span = spanWithScope.getSpan(); + + if (throwable != null) { + TRACER.endExceptionally(span, throwable); + } else { + TRACER.end(span); + } } } } diff --git a/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsTracer.java b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsTracer.java new file mode 100644 index 000000000000..b7842a26f267 --- /dev/null +++ b/instrumentation/kafka-streams-0.11/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsTracer.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kafkastreams; + +import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; +import static io.opentelemetry.javaagent.instrumentation.kafkastreams.TextMapExtractAdapter.GETTER; + +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.Span.Kind; +import io.opentelemetry.trace.attributes.SemanticAttributes; +import org.apache.kafka.streams.processor.internals.StampedRecord; + +public class KafkaStreamsTracer extends BaseTracer { + public static final KafkaStreamsTracer TRACER = new KafkaStreamsTracer(); + + public Span startSpan(StampedRecord record) { + Span span = + tracer + .spanBuilder(spanNameForConsume(record)) + .setSpanKind(Kind.CONSUMER) + .setParent(extract(record.value.headers(), GETTER)) + .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "kafka") + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION, record.topic()) + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic") + .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .startSpan(); + onConsume(span, record); + return span; + } + + public String spanNameForConsume(StampedRecord record) { + if (record == null) { + return null; + } + return record.topic() + " process"; + } + + public void onConsume(Span span, StampedRecord record) { + if (record != null) { + span.setAttribute("partition", record.partition()); + span.setAttribute("offset", record.offset()); + } + } + + @Override + protected String getInstrumentationName() { + return "io.opentelemetry.auto.kafka-streams-0.11"; + } +} diff --git a/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy b/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy index cf351f8cd0b2..f272362e8ad4 100644 --- a/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy +++ b/instrumentation/kafka-streams-0.11/src/test/groovy/KafkaStreamsTest.groovy @@ -10,6 +10,7 @@ import static io.opentelemetry.trace.TracingContextUtils.getSpan import io.grpc.Context import io.opentelemetry.instrumentation.test.AgentTestRunner import io.opentelemetry.context.propagation.TextMapPropagator +import io.opentelemetry.trace.attributes.SemanticAttributes import io.opentelemetry.trace.propagation.HttpTraceContext import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit @@ -126,20 +127,28 @@ class KafkaStreamsTest extends AgentTestRunner { trace(0, 5) { // PRODUCER span 0 span(0) { - name STREAM_PENDING + name STREAM_PENDING + " send" kind PRODUCER errored false hasNoParent() attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } // CONSUMER span 0 span(1) { - name STREAM_PENDING + name STREAM_PENDING + " process" kind CONSUMER errored false childOf span(0) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "partition" { it >= 0 } "offset" 0 "record.queue_time_ms" { it >= 0 } @@ -147,11 +156,15 @@ class KafkaStreamsTest extends AgentTestRunner { } // STREAMING span 1 span(2) { - name STREAM_PENDING + name STREAM_PENDING + " process" kind CONSUMER errored false childOf span(0) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PENDING + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" "partition" { it >= 0 } "offset" 0 "asdf" "testing" @@ -159,20 +172,28 @@ class KafkaStreamsTest extends AgentTestRunner { } // STREAMING span 0 span(3) { - name STREAM_PROCESSED + name STREAM_PROCESSED + " send" kind PRODUCER errored false childOf span(2) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } // CONSUMER span 0 span(4) { - name STREAM_PROCESSED + name STREAM_PROCESSED + " process" kind CONSUMER errored false childOf span(3) attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" STREAM_PROCESSED + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long "partition" { it >= 0 } "offset" 0 "record.queue_time_ms" { it >= 0 } diff --git a/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java b/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java index d5f1d9e17155..860128741955 100644 --- a/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitChannelInstrumentation.java @@ -5,19 +5,12 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp; -import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitCommandInstrumentation.SpanHolder.CURRENT_RABBIT_SPAN; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitDecorator.TRACER; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.TextMapExtractAdapter.GETTER; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitTracer.TRACER; import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.TextMapInjectAdapter.SETTER; import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed; import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface; import static io.opentelemetry.javaagent.tooling.matcher.NameMatchers.namedOneOf; -import static io.opentelemetry.trace.Span.Kind.CLIENT; -import static io.opentelemetry.trace.Span.Kind.PRODUCER; -import static io.opentelemetry.trace.TracingContextUtils.currentContextWith; -import static io.opentelemetry.trace.TracingContextUtils.getSpan; import static io.opentelemetry.trace.TracingContextUtils.withSpan; import static net.bytebuddy.matcher.ElementMatchers.canThrow; import static net.bytebuddy.matcher.ElementMatchers.isGetter; @@ -33,23 +26,20 @@ import com.google.auto.service.AutoService; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.MessageProperties; import io.grpc.Context; import io.opentelemetry.OpenTelemetry; +import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.instrumentation.api.CallDepthThreadLocalMap; -import io.opentelemetry.javaagent.instrumentation.api.SpanWithScope; import io.opentelemetry.javaagent.tooling.Instrumenter; import io.opentelemetry.trace.Span; -import io.opentelemetry.trace.SpanContext; import io.opentelemetry.trace.attributes.SemanticAttributes; import java.io.IOException; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; @@ -76,7 +66,7 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".RabbitDecorator", + packageName + ".RabbitTracer", packageName + ".TextMapInjectAdapter", packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer", @@ -115,45 +105,41 @@ public Map, String> transfor return transformers; } + // TODO Why do we start span here and not in ChannelPublishAdvice below? public static class ChannelMethodAdvice { @Advice.OnMethodEnter - public static SpanWithScope onEnter( - @Advice.This Channel channel, @Advice.Origin("Channel.#m") String method) { + public static void onEnter( + @Advice.This Channel channel, + @Advice.Origin("Channel.#m") String method, + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope) { int callDepth = CallDepthThreadLocalMap.incrementCallDepth(Channel.class); if (callDepth > 0) { - return null; + return; } - Connection connection = channel.getConnection(); - - Span.Builder spanBuilder = TRACER.spanBuilder(method); - if (method.equals("Channel.basicPublish")) { - spanBuilder.setSpanKind(PRODUCER); - } else { - spanBuilder.setSpanKind(CLIENT); - } - Span span = spanBuilder.startSpan(); - span.setAttribute(SemanticAttributes.NET_PEER_PORT, (long) connection.getPort()); - DECORATE.afterStart(span); - DECORATE.onPeerConnection(span, connection.getAddress()); + span = TRACER.startSpan(method, channel.getConnection()); CURRENT_RABBIT_SPAN.set(span); - return new SpanWithScope(span, currentContextWith(span)); + scope = TRACER.startScope(span); } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( - @Advice.Enter SpanWithScope spanWithScope, @Advice.Thrown Throwable throwable) { - if (spanWithScope == null) { + @Advice.Thrown Throwable throwable, + @Advice.Local("otelSpan") Span span, + @Advice.Local("otelScope") Scope scope) { + if (scope == null) { return; } + scope.close(); CallDepthThreadLocalMap.reset(Channel.class); CURRENT_RABBIT_SPAN.remove(); - Span span = spanWithScope.getSpan(); - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.end(); - spanWithScope.closeScope(); + if (throwable != null) { + TRACER.endExceptionally(span, throwable); + } else { + TRACER.end(span); + } } } @@ -167,9 +153,11 @@ public static void setSpanNameAddHeaders( Span span = TRACER.getCurrentSpan(); if (span.getContext().isValid()) { - DECORATE.afterStart(span); // Overwrite tags set by generic decorator. - DECORATE.onPublish(span, exchange, routingKey); - span.setAttribute("message.size", body == null ? 0 : body.length); + TRACER.onPublish(span, exchange, routingKey); + if (body != null) { + span.setAttribute( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length); + } // This is the internal behavior when props are null. We're just doing it earlier now. if (props == null) { @@ -224,45 +212,19 @@ public static void extractAndStartSpan( @Advice.Local("callDepth") int callDepth, @Advice.Return GetResponse response, @Advice.Thrown Throwable throwable) { - if (callDepth > 0) { return; } CallDepthThreadLocalMap.reset(Channel.class); - // can't create span and put into scope in method enter above, because can't add links after + // can't create span and put into scope in method enter above, because can't add parent after // span creation - Span.Builder spanBuilder = - TRACER - .spanBuilder(DECORATE.spanNameOnGet(queue)) - .setSpanKind(CLIENT) - .setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTime)); - - if (response != null && response.getProps() != null) { - Map headers = response.getProps().getHeaders(); - - if (headers != null) { - Context context = extract(headers, GETTER); - SpanContext spanContext = getSpan(context).getContext(); - if (spanContext.isValid()) { - spanBuilder.addLink(spanContext); - } - } - } - - Connection connection = channel.getConnection(); - - Span span = spanBuilder.startSpan(); - if (response != null) { - span.setAttribute("message.size", response.getBody().length); + Span span = TRACER.startGetSpan(queue, startTime, response, channel.getConnection()); + if (throwable != null) { + TRACER.endExceptionally(span, throwable); + } else { + TRACER.end(span); } - span.setAttribute(SemanticAttributes.NET_PEER_PORT, (long) connection.getPort()); - DECORATE.afterStart(span); - DECORATE.onGet(span, queue); - DECORATE.onPeerConnection(span, connection.getAddress()); - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.end(); } } diff --git a/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java b/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java index c933bd349986..b1e260a31ca9 100644 --- a/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java +++ b/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitCommandInstrumentation.java @@ -6,7 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp; import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitCommandInstrumentation.SpanHolder.CURRENT_RABBIT_SPAN; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitTracer.TRACER; import static io.opentelemetry.javaagent.tooling.ClassLoaderMatcher.hasClassesNamed; import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface; import static java.util.Collections.singletonMap; @@ -44,7 +44,7 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".RabbitDecorator", + packageName + ".RabbitTracer", // These are only used by muzzleCheck: packageName + ".TextMapExtractAdapter", packageName + ".TracedDelegatingConsumer", @@ -69,7 +69,7 @@ public static void setSpanNameAddHeaders(@Advice.This Command command) { Span span = CURRENT_RABBIT_SPAN.get(); if (span != null && command.getMethod() != null) { - DECORATE.onCommand(span, command); + TRACER.onCommand(span, command); } } diff --git a/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitDecorator.java b/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitDecorator.java deleted file mode 100644 index bccb0c1d076a..000000000000 --- a/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitDecorator.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp; - -import com.rabbitmq.client.Command; -import com.rabbitmq.client.Envelope; -import io.opentelemetry.OpenTelemetry; -import io.opentelemetry.instrumentation.api.decorator.ClientDecorator; -import io.opentelemetry.trace.Span; -import io.opentelemetry.trace.Tracer; - -public class RabbitDecorator extends ClientDecorator { - - public static final RabbitDecorator DECORATE = new RabbitDecorator(); - - public static final Tracer TRACER = - OpenTelemetry.getTracer("io.opentelemetry.auto.rabbitmq-amqp-2.7"); - - public void onPublish(Span span, String exchange, String routingKey) { - String exchangeName = exchange == null || exchange.isEmpty() ? "" : exchange; - String routing = - routingKey == null || routingKey.isEmpty() - ? "" - : routingKey.startsWith("amq.gen-") ? "" : routingKey; - span.updateName(exchangeName + " -> " + routing); - span.setAttribute("amqp.command", "basic.publish"); - if (exchange != null && !exchange.isEmpty()) { - span.setAttribute("amqp.exchange", exchange); - } - if (routingKey != null && !routingKey.isEmpty()) { - span.setAttribute("amqp.routing_key", routingKey); - } - } - - public String spanNameOnGet(String queue) { - return queue.startsWith("amq.gen-") ? "" : queue; - } - - public void onGet(Span span, String queue) { - span.setAttribute("amqp.command", "basic.get"); - span.setAttribute("amqp.queue", queue); - } - - public String spanNameOnDeliver(String queue) { - if (queue == null || queue.isEmpty()) { - return ""; - } else if (queue.startsWith("amq.gen-")) { - return ""; - } else { - return queue; - } - } - - public void onDeliver(Span span, Envelope envelope) { - span.setAttribute("amqp.command", "basic.deliver"); - - if (envelope != null) { - String exchange = envelope.getExchange(); - if (exchange != null && !exchange.isEmpty()) { - span.setAttribute("amqp.exchange", exchange); - } - String routingKey = envelope.getRoutingKey(); - if (routingKey != null && !routingKey.isEmpty()) { - span.setAttribute("amqp.routing_key", routingKey); - } - } - } - - public void onCommand(Span span, Command command) { - String name = command.getMethod().protocolMethodName(); - - if (!name.equals("basic.publish")) { - span.updateName(name); - } - span.setAttribute("amqp.command", name); - } -} diff --git a/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitTracer.java b/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitTracer.java new file mode 100644 index 000000000000..df95be770dcd --- /dev/null +++ b/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/RabbitTracer.java @@ -0,0 +1,164 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp; + +import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.TextMapExtractAdapter.GETTER; +import static io.opentelemetry.trace.Span.Kind.CLIENT; +import static io.opentelemetry.trace.Span.Kind.CONSUMER; +import static io.opentelemetry.trace.Span.Kind.PRODUCER; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Command; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.GetResponse; +import io.opentelemetry.instrumentation.api.tracer.BaseTracer; +import io.opentelemetry.instrumentation.api.tracer.utils.NetPeerUtils; +import io.opentelemetry.trace.Span; +import io.opentelemetry.trace.attributes.SemanticAttributes; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class RabbitTracer extends BaseTracer { + + public static final RabbitTracer TRACER = new RabbitTracer(); + + public Span startSpan(String method, Connection connection) { + Span.Kind kind = method.equals("Channel.basicPublish") ? PRODUCER : CLIENT; + Span span = startSpan(method, kind); + span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rabbitmq"); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue"); + + NetPeerUtils.setNetPeer(span, connection.getAddress(), connection.getPort()); + + return span; + } + + public Span startGetSpan( + String queue, long startTime, GetResponse response, Connection connection) { + Span.Builder spanBuilder = + tracer + .spanBuilder(spanNameOnGet(queue)) + .setSpanKind(CLIENT) + .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rabbitmq") + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue") + .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive") + .setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTime)); + + Span span = spanBuilder.startSpan(); + if (response != null) { + span.setAttribute( + SemanticAttributes.MESSAGING_DESTINATION, + normalizeExchangeName(response.getEnvelope().getExchange())); + span.setAttribute("messaging.rabbitmq.routing_key", response.getEnvelope().getRoutingKey()); + span.setAttribute( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + (long) response.getBody().length); + } + NetPeerUtils.setNetPeer(span, connection.getAddress(), connection.getPort()); + onGet(span, queue); + + return span; + } + + public Span startDeliverySpan( + String queue, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + Map headers = properties.getHeaders(); + long startTimeMillis = System.currentTimeMillis(); + Span span = + tracer + .spanBuilder(spanNameOnDeliver(queue)) + .setSpanKind(CONSUMER) + .setParent(extract(headers, GETTER)) + .setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTimeMillis)) + .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rabbitmq") + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "queue") + .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .startSpan(); + onDeliver(span, envelope); + + if (body != null) { + span.setAttribute( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length); + } + if (properties.getTimestamp() != null) { + // this will be set if the sender sets the timestamp, + // or if a plugin is installed on the rabbitmq broker + long produceTime = properties.getTimestamp().getTime(); + long consumeTime = NANOSECONDS.toMillis(startTimeMillis); + span.setAttribute("record.queue_time_ms", Math.max(0L, consumeTime - produceTime)); + } + + return span; + } + + public void onPublish(Span span, String exchange, String routingKey) { + String exchangeName = normalizeExchangeName(exchange); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, exchangeName); + String routing = + routingKey == null || routingKey.isEmpty() + ? "" + : routingKey.startsWith("amq.gen-") ? "" : routingKey; + span.updateName(exchangeName + " -> " + routing + " send"); + span.setAttribute("amqp.command", "basic.publish"); + if (routingKey != null && !routingKey.isEmpty()) { + span.setAttribute("messaging.rabbitmq.routing_key", routingKey); + span.setAttribute("amqp.routing_key", routingKey); + } + } + + public String spanNameOnGet(String queue) { + return (queue.startsWith("amq.gen-") ? "" : queue) + " receive"; + } + + public void onGet(Span span, String queue) { + span.setAttribute("amqp.command", "basic.get"); + span.setAttribute("amqp.queue", queue); + } + + public String spanNameOnDeliver(String queue) { + if (queue == null || queue.isEmpty()) { + return " process"; + } else if (queue.startsWith("amq.gen-")) { + return " process"; + } else { + return queue + " process"; + } + } + + public void onDeliver(Span span, Envelope envelope) { + span.setAttribute("amqp.command", "basic.deliver"); + + if (envelope != null) { + String exchange = envelope.getExchange(); + span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, normalizeExchangeName(exchange)); + String routingKey = envelope.getRoutingKey(); + if (routingKey != null && !routingKey.isEmpty()) { + span.setAttribute("messaging.rabbitmq.routing_key", routingKey); + } + } + } + + private String normalizeExchangeName(String exchange) { + return exchange == null || exchange.isEmpty() ? "" : exchange; + } + + public void onCommand(Span span, Command command) { + String name = command.getMethod().protocolMethodName(); + + if (!name.equals("basic.publish")) { + span.updateName(name); + } + span.setAttribute("amqp.command", name); + } + + @Override + protected String getInstrumentationName() { + return "io.opentelemetry.auto.rabbitmq-amqp-2.7"; + } +} diff --git a/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java b/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java index 947cad9c065f..44b4f9e28a2a 100644 --- a/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java +++ b/instrumentation/rabbitmq-2.7/src/main/java/io/opentelemetry/javaagent/instrumentation/rabbitmq/amqp/TracedDelegatingConsumer.java @@ -5,13 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp; -import static io.opentelemetry.instrumentation.api.decorator.BaseDecorator.extract; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitDecorator.DECORATE; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitDecorator.TRACER; -import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.TextMapExtractAdapter.GETTER; -import static io.opentelemetry.trace.Span.Kind.CONSUMER; -import static io.opentelemetry.trace.TracingContextUtils.currentContextWith; -import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static io.opentelemetry.javaagent.instrumentation.rabbitmq.amqp.RabbitTracer.TRACER; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Consumer; @@ -20,8 +14,6 @@ import io.opentelemetry.context.Scope; import io.opentelemetry.trace.Span; import java.io.IOException; -import java.util.Map; -import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,46 +65,29 @@ public void handleDelivery( Span span = null; Scope scope = null; try { - Map headers = properties.getHeaders(); - long startTimeMillis = System.currentTimeMillis(); - span = - TRACER - .spanBuilder(DECORATE.spanNameOnDeliver(queue)) - .setSpanKind(CONSUMER) - .setParent(extract(headers, GETTER)) - .setAttribute("message.size", body == null ? 0 : body.length) - .setStartTimestamp(TimeUnit.MILLISECONDS.toNanos(startTimeMillis)) - .startSpan(); - DECORATE.afterStart(span); - DECORATE.onDeliver(span, envelope); - - if (properties.getTimestamp() != null) { - // this will be set if the sender sets the timestamp, - // or if a plugin is installed on the rabbitmq broker - long produceTime = properties.getTimestamp().getTime(); - long consumeTime = NANOSECONDS.toMillis(startTimeMillis); - span.setAttribute("record.queue_time_ms", Math.max(0L, consumeTime - produceTime)); - } - - scope = currentContextWith(span); + span = TRACER.startDeliverySpan(queue, envelope, properties, body); + scope = TRACER.startScope(span); } catch (Exception e) { log.debug("Instrumentation error in tracing consumer", e); } finally { + // TODO this is very unusual code structure for this repo + // We have to review it try { - // Call delegate. delegate.handleDelivery(consumerTag, envelope, properties, body); + if (span != null) { + TRACER.end(span); + } } catch (Throwable throwable) { if (span != null) { - DECORATE.onError(span, throwable); + TRACER.endExceptionally(span, throwable); } + throw throwable; } finally { if (scope != null) { - DECORATE.beforeFinish(span); - span.end(); scope.close(); } } diff --git a/instrumentation/rabbitmq-2.7/src/test/groovy/RabbitMQTest.groovy b/instrumentation/rabbitmq-2.7/src/test/groovy/RabbitMQTest.groovy index 97ddb432ccb5..1598886d2e62 100644 --- a/instrumentation/rabbitmq-2.7/src/test/groovy/RabbitMQTest.groovy +++ b/instrumentation/rabbitmq-2.7/src/test/groovy/RabbitMQTest.groovy @@ -30,11 +30,8 @@ import org.springframework.amqp.rabbit.connection.CachingConnectionFactory import org.springframework.amqp.rabbit.core.RabbitAdmin import org.springframework.amqp.rabbit.core.RabbitTemplate import org.testcontainers.containers.GenericContainer -import spock.lang.Requires import spock.lang.Shared -// Do not run tests on Java7 since testcontainers are not compatible with Java7 -@Requires({ jvm.java8Compatible }) class RabbitMQTest extends AgentTestRunner { /* @@ -107,11 +104,11 @@ class RabbitMQTest extends AgentTestRunner { attributes { } } - rabbitSpan(it, 1, "exchange.declare", span(0)) - rabbitSpan(it, 2, "queue.declare", span(0)) - rabbitSpan(it, 3, "queue.bind", span(0)) - rabbitSpan(it, 4, "$exchangeName -> $routingKey", span(0)) - rabbitSpan(it, 5, "", span(0), span(4)) + rabbitSpan(it, 1, null, null, null, "exchange.declare", span(0)) + rabbitSpan(it, 2, null, null, null, "queue.declare", span(0)) + rabbitSpan(it, 3, null, null, null, "queue.bind", span(0)) + rabbitSpan(it, 4, exchangeName, routingKey, "send", "$exchangeName -> $routingKey", span(0)) + rabbitSpan(it, 5, exchangeName, routingKey, "receive", "", span(0)) } } @@ -132,13 +129,13 @@ class RabbitMQTest extends AgentTestRunner { and: assertTraces(3) { trace(0, 1) { - rabbitSpan(it, 0, "queue.declare") + rabbitSpan(it, 0, null, null, null, "queue.declare") } trace(1, 1) { - rabbitSpan(it, 0, " -> ") + rabbitSpan(it, 0, "", null, "send", " -> ") } trace(2, 1) { - rabbitSpan(it, 0, "", null, traces[1][0]) + rabbitSpan(it, 0, "", null, "receive", "", null) } } } @@ -176,21 +173,21 @@ class RabbitMQTest extends AgentTestRunner { expect: assertTraces(4 + messageCount) { trace(0, 1) { - rabbitSpan(it, "exchange.declare") + rabbitSpan(it, null, null, null, "exchange.declare") } trace(1, 1) { - rabbitSpan(it, "queue.declare") + rabbitSpan(it, null, null, null, "queue.declare") } trace(2, 1) { - rabbitSpan(it, "queue.bind") + rabbitSpan(it, null, null, null, "queue.bind") } trace(3, 1) { - rabbitSpan(it, "basic.consume") + rabbitSpan(it, null, null, null, "basic.consume") } (1..messageCount).each { trace(3 + it, 2) { - rabbitSpan(it, 0, "$exchangeName -> ") - rabbitSpan(it, 1, resource, span(0), null, null, null, setTimestamp) + rabbitSpan(it, 0, exchangeName, null, "send", "$exchangeName -> ") + rabbitSpan(it, 1, exchangeName, null, "process", resource, span(0), null, null, null, setTimestamp) } } } @@ -231,20 +228,20 @@ class RabbitMQTest extends AgentTestRunner { expect: assertTraces(5) { trace(0, 1) { - rabbitSpan(it, "exchange.declare") + rabbitSpan(it, null, null, null, "exchange.declare") } trace(1, 1) { - rabbitSpan(it, "queue.declare") + rabbitSpan(it, null, null, null, "queue.declare") } trace(2, 1) { - rabbitSpan(it, "queue.bind") + rabbitSpan(it, null, null, null, "queue.bind") } trace(3, 1) { - rabbitSpan(it, "basic.consume") + rabbitSpan(it, null, null, null, "basic.consume") } trace(4, 2) { - rabbitSpan(it, 0, "$exchangeName -> ") - rabbitSpan(it, 1, "", span(0), null, error, error.message) + rabbitSpan(it, 0, exchangeName, null, "send", "$exchangeName -> ") + rabbitSpan(it, 1, exchangeName, null, "process", "", span(0), null, error, error.message) } } @@ -263,19 +260,19 @@ class RabbitMQTest extends AgentTestRunner { assertTraces(1) { trace(0, 1) { - rabbitSpan(it, command, null, null, throwable, errorMsg) + rabbitSpan(it, null, null, operation, command, null, null, throwable, errorMsg) } } where: - command | exception | errorMsg | closure - "exchange.declare" | IOException | null | { + command | exception | errorMsg | operation | closure + "exchange.declare" | IOException | null | null | { it.exchangeDeclare("some-exchange", "invalid-type", true) } - "Channel.basicConsume" | IllegalStateException | "Invalid configuration: 'queue' must be non-null." | { + "Channel.basicConsume" | IllegalStateException | "Invalid configuration: 'queue' must be non-null." | null | { it.basicConsume(null, null) } - "" | IOException | null | { + "" | IOException | null | "receive" | { it.basicGet("amq.gen-invalid-channel", true) } } @@ -296,19 +293,22 @@ class RabbitMQTest extends AgentTestRunner { and: assertTraces(3) { trace(0, 1) { - rabbitSpan(it, "queue.declare") + rabbitSpan(it, null, null, null, "queue.declare") } trace(1, 1) { - rabbitSpan(it, 0, " -> some-routing-queue") + rabbitSpan(it, 0, "", "some-routing-queue", "send", " -> some-routing-queue") } trace(2, 1) { - rabbitSpan(it, 0, queue.name, null, traces[1][0]) + rabbitSpan(it, 0, "", "some-routing-queue", "receive", queue.name, null) } } } def rabbitSpan( TraceAssert trace, + String exchange, + String routingKey, + String operation, String resource, Object parentSpan = null, Object linkSpan = null, @@ -316,12 +316,15 @@ class RabbitMQTest extends AgentTestRunner { String errorMsg = null, Boolean expectTimestamp = false ) { - rabbitSpan(trace, 0, resource, parentSpan, linkSpan, exception, errorMsg, expectTimestamp) + rabbitSpan(trace, 0, exchange, routingKey, operation, resource, parentSpan, linkSpan, exception, errorMsg, expectTimestamp) } def rabbitSpan( TraceAssert trace, int index, + String exchange, + String routingKey, + String operation, String resource, Object parentSpan = null, Object linkSpan = null, @@ -329,8 +332,14 @@ class RabbitMQTest extends AgentTestRunner { String errorMsg = null, Boolean expectTimestamp = false ) { + + def spanName = resource + if (operation != null) { + spanName = spanName + " " + operation + } + trace.span(index) { - name resource + name spanName switch (trace.span(index).attributes.get(AttributeKey.stringKey("amqp.command"))) { case "basic.publish": @@ -362,9 +371,18 @@ class RabbitMQTest extends AgentTestRunner { } attributes { - "${SemanticAttributes.NET_PEER_NAME.key()}" { it == null || it instanceof String } - "${SemanticAttributes.NET_PEER_IP.key()}" { "127.0.0.1" } - "${SemanticAttributes.NET_PEER_PORT.key()}" { it == null || it instanceof Long } + "${SemanticAttributes.NET_PEER_NAME.key}" { it == null || it instanceof String } + "${SemanticAttributes.NET_PEER_IP.key}" { "127.0.0.1" } + "${SemanticAttributes.NET_PEER_PORT.key}" { it == null || it instanceof Long } + + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rabbitmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" exchange + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "queue" + //TODO add to SemanticAttributes + "messaging.rabbitmq.routing_key" { it == null || it == routingKey || it.startsWith("amq.gen-") } + if (operation != null && operation != "send") { + "${SemanticAttributes.MESSAGING_OPERATION.key}" operation + } if (expectTimestamp) { "record.queue_time_ms" { it instanceof Long && it >= 0 } } @@ -372,22 +390,21 @@ class RabbitMQTest extends AgentTestRunner { switch (attribute("amqp.command")) { case "basic.publish": "amqp.command" "basic.publish" - "amqp.exchange" { it == null || it == "some-exchange" || it == "some-error-exchange" } "amqp.routing_key" { it == null || it == "some-routing-key" || it == "some-routing-queue" || it.startsWith("amq.gen-") } "amqp.delivery_mode" { it == null || it == 2 } - "message.size" Long + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long break case "basic.get": "amqp.command" "basic.get" + //TODO why this queue name is not a destination for semantic convention "amqp.queue" { it == "some-queue" || it == "some-routing-queue" || it.startsWith("amq.gen-") } - "message.size" { it == null || it instanceof Long } + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" { it == null || it instanceof Long } break case "basic.deliver": "amqp.command" "basic.deliver" - "amqp.exchange" { it == "some-exchange" || it == "some-error-exchange" } - "message.size" Long + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long break default: "amqp.command" { it == null || it == resource }