Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix JMS context propagation problems #2702

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class Jms2Test extends AgentInstrumentationSpecification {
expect:
receivedMessage == null
// span is not created if no message is received
assertTraces(0,{})
assertTraces(0, {})

cleanup:
consumer.close()
Expand Down Expand Up @@ -194,6 +194,47 @@ class Jms2Test extends AgentInstrumentationSpecification {
session.createTopic("someTopic") | "topic" | "someTopic"
}

def "sending a message to #destinationName #destinationType with explicit destination propagates context"() {
given:
def producer = session.createProducer(null)
def consumer = session.createConsumer(destination)

def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
}
}

when:
producer.send(destination, message)
lock.countDown()

then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText

cleanup:
producer.close()
consumer.close()

where:
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "(temporary)"
session.createTemporaryTopic() | "topic" | "(temporary)"
}

static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
trace.span(index) {
name destinationName + " send"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public static void onEnter(
MessageDestination messageDestination =
tracer().extractDestination(message, defaultDestination);
context = tracer().startProducerSpan(messageDestination, message);
// TODO: why are we propagating context only in this advice class? the other one does not
// inject current span context into JMS message
scope = tracer().startProducerScope(context, message);
scope = context.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand All @@ -90,9 +88,9 @@ public static void stopSpan(
if (scope == null) {
return;
}
scope.close();
CallDepthThreadLocalMap.reset(MessageProducer.class);

scope.close();
if (throwable != null) {
tracer().endExceptionally(context, throwable);
} else {
Expand Down Expand Up @@ -129,6 +127,7 @@ public static void stopSpan(
}
CallDepthThreadLocalMap.reset(MessageProducer.class);

scope.close();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...aaaand one more scope leak detected 😄
It was worth adding that one test case

if (throwable != null) {
tracer().endExceptionally(context, throwable);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
import static io.opentelemetry.javaagent.instrumentation.jms.MessageExtractAdapter.GETTER;
import static io.opentelemetry.javaagent.instrumentation.jms.MessageInjectAdapter.SETTER;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -43,12 +41,7 @@ public Context startConsumerSpan(
MessageDestination destination, String operation, Message message, long startTime) {
Context parentContext = Context.root();
if (message != null && "process".equals(operation)) {
// TODO use BaseTracer.extract() which has context leak detection
// (and fix the context leak that it is currently detecting when running Jms2Test)
parentContext =
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.extract(Context.root(), message, GETTER);
parentContext = extract(message, GETTER);
}

SpanBuilder spanBuilder =
Expand All @@ -64,12 +57,9 @@ public Context startProducerSpan(MessageDestination destination, Message message
Context parentContext = Context.current();
SpanBuilder span = spanBuilder(parentContext, spanName(destination, "send"), PRODUCER);
afterStart(span, destination, message);
return parentContext.with(span.startSpan());
}

public Scope startProducerScope(Context context, Message message) {
Context context = parentContext.with(span.startSpan());
inject(context, message, SETTER);
return context.makeCurrent();
return context;
}

public String spanName(MessageDestination destination, String operation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import org.slf4j.LoggerFactory
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import spock.lang.Shared
import spock.lang.Unroll

@Unroll
class Jms1Test extends AgentInstrumentationSpecification {

private static final Logger logger = LoggerFactory.getLogger(Jms1Test)
Expand Down Expand Up @@ -224,6 +226,47 @@ class Jms1Test extends AgentInstrumentationSpecification {
session.createTemporaryTopic() | "topic" | "(temporary)"
}

def "sending a message to #destinationName #destinationType with explicit destination propagates context"() {
given:
def producer = session.createProducer(null)
def consumer = session.createConsumer(destination)

def lock = new CountDownLatch(1)
def messageRef = new AtomicReference<TextMessage>()
consumer.setMessageListener new MessageListener() {
@Override
void onMessage(Message message) {
lock.await() // ensure the producer trace is reported first.
messageRef.set(message)
}
}

when:
producer.send(destination, message)
lock.countDown()

then:
assertTraces(1) {
trace(0, 2) {
producerSpan(it, 0, destinationType, destinationName)
consumerSpan(it, 1, destinationType, destinationName, messageRef.get().getJMSMessageID(), span(0), "process")
}
}
// This check needs to go after all traces have been accounted for
messageRef.get().text == messageText

cleanup:
producer.close()
consumer.close()

where:
destination | destinationType | destinationName
session.createQueue("someQueue") | "queue" | "someQueue"
session.createTopic("someTopic") | "topic" | "someTopic"
session.createTemporaryQueue() | "queue" | "(temporary)"
session.createTemporaryTopic() | "topic" | "(temporary)"
}

static producerSpan(TraceAssert trace, int index, String destinationType, String destinationName) {
trace.span(index) {
name destinationName + " send"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ protected Boolean computeValue(Class<?> taskClass) {
if (enclosingClass.getName().equals("com.squareup.okhttp.ConnectionPool")) {
return false;
}

// Avoid instrumenting internal OrderedExecutor worker class
if (enclosingClass
.getName()
.equals("org.hornetq.utils.OrderedExecutorFactory$OrderedExecutor")) {
return false;
}
}

return true;
Expand Down