Skip to content

Commit

Permalink
Fix JMS context propagation problems (#2702)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek authored Apr 6, 2021
1 parent 53ffe9f commit 5760cd7
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
expect:
receivedMessage == null
// span is not created if no message is received
assertTraces(0,{})
assertTraces(0, {})

cleanup:
consumer.close()
Expand Down Expand Up @@ -194,6 +194,47 @@ class Jms2Test extends AgentInstrumentationSpecification {
session.createTopic("someTopic") | "topic" | "someTopic"
}
def "sending a message to #destinationName #destinationType with explicit destination propagates context"() {
given:
def producer = session.createProducer(null)
def consumer = session.createConsumer(destination)
def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
}
}
when:
producer.send(destination, message)
lock.countDown()
then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText
cleanup:
producer.close()
consumer.close()
where:
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "(temporary)"
session.createTemporaryTopic() | "topic" | "(temporary)"
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
trace.span(index) {
name destinationName + " send"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public static void onEnter(
MessageDestination messageDestination =
tracer().extractDestination(message, defaultDestination);
context = tracer().startProducerSpan(messageDestination, message);
// TODO: why are we propagating context only in this advice class? the other one does not
// inject current span context into JMS message
scope = tracer().startProducerScope(context, message);
scope = context.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand All @@ -90,9 +88,9 @@ public static void stopSpan(
if (scope == null) {
return;
}
scope.close();
CallDepthThreadLocalMap.reset(MessageProducer.class);

scope.close();
if (throwable != null) {
tracer().endExceptionally(context, throwable);
} else {
Expand Down Expand Up @@ -129,6 +127,7 @@ public static void stopSpan(
}
CallDepthThreadLocalMap.reset(MessageProducer.class);

scope.close();
if (throwable != null) {
tracer().endExceptionally(context, throwable);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
import static io.opentelemetry.javaagent.instrumentation.jms.MessageExtractAdapter.GETTER;
import static io.opentelemetry.javaagent.instrumentation.jms.MessageInjectAdapter.SETTER;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -43,12 +41,7 @@ public Context startConsumerSpan(
MessageDestination destination, String operation, Message message, long startTime) {
Context parentContext = Context.root();
if (message != null && "process".equals(operation)) {
// TODO use BaseTracer.extract() which has context leak detection
// (and fix the context leak that it is currently detecting when running Jms2Test)
parentContext =
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.extract(Context.root(), message, GETTER);
parentContext = extract(message, GETTER);
}

SpanBuilder spanBuilder =
Expand All @@ -64,12 +57,9 @@ public Context startProducerSpan(MessageDestination destination, Message message
Context parentContext = Context.current();
SpanBuilder span = spanBuilder(parentContext, spanName(destination, "send"), PRODUCER);
afterStart(span, destination, message);
return parentContext.with(span.startSpan());
}

public Scope startProducerScope(Context context, Message message) {
Context context = parentContext.with(span.startSpan());
inject(context, message, SETTER);
return context.makeCurrent();
return context;
}

public String spanName(MessageDestination destination, String operation) {
Expand Down
43 changes: 43 additions & 0 deletions instrumentation/jms-1.1/javaagent/src/test/groovy/Jms1Test.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import spock.lang.Shared
import spock.lang.Unroll

@Unroll
class Jms1Test extends AgentInstrumentationSpecification {

private static final Logger logger = LoggerFactory.getLogger(Jms1Test)
Expand Down Expand Up @@ -224,6 +226,47 @@ class Jms1Test extends AgentInstrumentationSpecification {
session.createTemporaryTopic() | "topic" | "(temporary)"
}
def "sending a message to #destinationName #destinationType with explicit destination propagates context"() {
given:
def producer = session.createProducer(null)
def consumer = session.createConsumer(destination)
def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
}
}
when:
producer.send(destination, message)
lock.countDown()
then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText
cleanup:
producer.close()
consumer.close()
where:
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "(temporary)"
session.createTemporaryTopic() | "topic" | "(temporary)"
}
static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
trace.span(index) {
name destinationName + " send"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ protected Boolean computeValue(Class<?> taskClass) {
if (enclosingClass.getName().equals("com.squareup.okhttp.ConnectionPool")) {
return false;
}

// Avoid instrumenting internal OrderedExecutor worker class
if (enclosingClass
.getName()
.equals("org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor")) {
return false;
}
}

return true;
Expand Down

0 comments on commit 5760cd7

Please sign in to comment.