Skip to content

Commit

Permalink
Merge pull request quarkusio#45478 from arn-ivu/mutiny-tracing-helper
Browse files Browse the repository at this point in the history
Add helper methods for manual spans in mutiny pipelines
  • Loading branch information
ozangunalp authored Jan 16, 2025
2 parents f9979c9 + 1d9029d commit 1e499cf
Show file tree
Hide file tree
Showing 4 changed files with 493 additions and 0 deletions.
54 changes: 54 additions & 0 deletions docs/src/main/asciidoc/opentelemetry-tracing.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,60 @@ public void tracedWork() {
}
----

=== Mutiny
Methods returning reactive types can also be annotated with `@WithSpan` and `@AddingSpanAttributes` to create a new span or add attributes to the current span.

If you need to create spans manually within a mutiny pipeline, use `wrapWithSpan` method from `io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper`.

Example. Assuming you have the following pipeline:
[source,java]
----
Uni<String> uni = Uni.createFrom().item("hello")
//start trace here
.onItem().transform(item -> item + " world")
.onItem().transform(item -> item + "!")
//end trace here
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
wrap it like this:
[source,java]
----
import static io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper.wrapWithSpan;
...
@Inject
Tracer tracer;
...
Context context = Context.current();
Uni<String> uni = Uni.createFrom().item("hello")
.transformToUni(m -> wrapWithSpan(tracer, Optional.of(context), "my-span-name",
Uni.createFrom().item(m)
.onItem().transform(item -> item + " world")
.onItem().transform(item -> item + "!")
))
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
for multi-pipelines it works similarly:
[source,java]
----
Multi.createFrom().items("Alice", "Bob", "Charlie")
.transformToMultiAndConcatenate(m -> TracingHelper.withTrace("my-span-name",
Multi.createFrom().item(m)
.onItem().transform(name -> "Hello " + name)
))
.subscribe().with(
item -> System.out.println("Received: " + item),
failure -> System.out.println("Failed with " + failure)
);
----
Instead of `transformToMultiAndConcatenate` you can use `transformToMultiAndMerge` if you don't care about the order of the items.

=== Quarkus Messaging - Kafka

When using the Quarkus Messaging extension for Kafka,
Expand Down
5 changes: 5 additions & 0 deletions extensions/opentelemetry/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@
<artifactId>vertx-web-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-mutiny-vertx-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-routes-deployment</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
package io.quarkus.opentelemetry.deployment.traces;

import static io.quarkus.opentelemetry.runtime.tracing.mutiny.MutinyTracingHelper.wrapWithSpan;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;

import jakarta.inject.Inject;

import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporter;
import io.quarkus.opentelemetry.deployment.common.exporter.TestSpanExporterProvider;
import io.quarkus.opentelemetry.runtime.QuarkusContextStorage;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.helpers.test.UniAssertSubscriber;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

class MutinyTracingHelperTest {

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest()
.setArchiveProducer(
() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(TestSpanExporter.class, TestSpanExporterProvider.class)
.addAsResource(new StringAsset(TestSpanExporterProvider.class.getCanonicalName()),
"META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider"));

@Inject
private TestSpanExporter spanExporter;

@Inject
private Tracer tracer;

@Inject
private Vertx vertx;

@AfterEach
void tearDown() {
spanExporter.reset();
}

@ParameterizedTest(name = "{index}: Simple uni pipeline {1}")
@MethodSource("generateContextRunners")
void testSimpleUniPipeline(final String contextType, final String contextName) {

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("Hello")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, "testSpan",
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder("subspan").startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " world";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem().assertItem("Hello world");

//ensure there are two spans with subspan as child of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder("testSpan", "subspan");
assertChildSpan(spans, "testSpan", "subspan");
}

@ParameterizedTest(name = "{index}: Explicit parent {1}")
@MethodSource("generateContextRunners")
void testSpanWithExplicitParent(final String contextType, final String contextName) {

final String parentSpanName = "parentSpan";
final String pipelineSpanName = "pipelineSpan";
final String subspanName = "subspan";

final Span parentSpan = tracer.spanBuilder(parentSpanName).startSpan();
final io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.current().with(parentSpan);

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("Hello")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, Optional.of(parentContext),
pipelineSpanName,
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder(subspanName).startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " world";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem().assertItem("Hello world");
parentSpan.end();

//ensure there are 3 spans with proper parent-child relationships
final List<SpanData> spans = spanExporter.getFinishedSpanItems(3);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, pipelineSpanName,
subspanName);
assertChildSpan(spans, parentSpanName, pipelineSpanName);
assertChildSpan(spans, pipelineSpanName, subspanName);
}

@ParameterizedTest(name = "{index}: Nested uni pipeline with implicit parent {1}")
@MethodSource("generateContextRunners")
void testNestedPipeline_implicitParent(final String contextType,
final String contextName) {

final String parentSpanName = "parentSpan";
final String childSpanName = "childSpan";

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("test")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, parentSpanName,
Uni.createFrom().item(m)
.onItem().transform(s -> s + " in outer span")
.onItem().transformToUni(m1 -> wrapWithSpan(tracer, childSpanName,
Uni.createFrom().item(m1)
.onItem().transform(s -> "now in inner span")))

))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem();

//ensure there are 2 spans with doSomething and doSomethingAsync as children of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, childSpanName);
assertChildSpan(spans, parentSpanName, childSpanName);
}

@ParameterizedTest(name = "{index}: Nested uni pipeline with explicit no parent {1}")
@MethodSource("generateContextRunners")
void testNestedPipeline_explicitNoParent(final String contextType, final String contextName) {

final String parentSpanName = "parentSpan";
final String childSpanName = "childSpan";

final UniAssertSubscriber<String> subscriber = Uni.createFrom()
.item("test")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUni(m -> wrapWithSpan(tracer, parentSpanName,
Uni.createFrom().item(m)
.onItem().transform(s -> s + " in outer span")
.onItem().transformToUni(m1 -> wrapWithSpan(tracer, Optional.empty(), childSpanName,
Uni.createFrom().item(m1)
.onItem().transform(s -> "now in inner span")))

))
.subscribe()
.withSubscriber(new UniAssertSubscriber<>());

subscriber.awaitItem();

//ensure there are 2 spans but without parent-child relationship
final List<SpanData> spans = spanExporter.getFinishedSpanItems(2);
assertThat(spans.stream().map(SpanData::getName)).containsExactlyInAnyOrder(parentSpanName, childSpanName);
assertThat(spans.stream()
.filter(span -> span.getName().equals(childSpanName))
.findAny()
.orElseThrow()
.getParentSpanId()).isEqualTo("0000000000000000");//signifies no parent
}

@ParameterizedTest(name = "{index}: Concatenating multi pipeline {1}")
@MethodSource("generateContextRunners")
void testSimpleMultiPipeline_Concatenate(final String contextType, final String contextName) {

final AssertSubscriber<String> subscriber = Multi.createFrom()
.items("test1", "test2", "test3")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUniAndConcatenate(m -> wrapWithSpan(tracer, Optional.empty(), "testSpan " + m,
//the traced pipeline
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder("subspan " + s).startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " transformed";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(AssertSubscriber.create(3));

subscriber.awaitCompletion().assertItems("test1 transformed", "test2 transformed", "test3 transformed");

//ensure there are six spans with three pairs of subspan as child of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(6);
for (int i = 1; i <= 3; i++) {
final int currentI = i;
assertThat(spans.stream().anyMatch(span -> span.getName().equals("testSpan test" + currentI))).isTrue();
assertThat(spans.stream().anyMatch(span -> span.getName().equals("subspan test" + currentI))).isTrue();
assertChildSpan(spans, "testSpan test" + currentI, "subspan test" + currentI);
}
}

@ParameterizedTest(name = "{index}: Merging multi pipeline {1}")
@MethodSource("generateContextRunners")
void testSimpleMultiPipeline_Merge(final String contextType, final String contextName) {

final AssertSubscriber<String> subscriber = Multi.createFrom()
.items("test1", "test2", "test3")
.emitOn(r -> runOnContext(r, vertx, contextType))
.onItem()
.transformToUniAndMerge(m -> wrapWithSpan(tracer, Optional.empty(), "testSpan " + m,
Uni.createFrom().item(m).onItem().transform(s -> {
final Span span = tracer.spanBuilder("subspan " + s).startSpan();
try (final Scope scope = span.makeCurrent()) {
return s + " transformed";
} finally {
span.end();
}
})))
.subscribe()
.withSubscriber(AssertSubscriber.create(3));

subscriber.awaitCompletion();

//ensure there are six spans with three pairs of subspan as child of testSpan
final List<SpanData> spans = spanExporter.getFinishedSpanItems(6);
for (int i = 1; i <= 3; i++) {
final int currentI = i;
assertThat(spans.stream().anyMatch(span -> span.getName().equals("testSpan test" + currentI))).isTrue();
assertThat(spans.stream().anyMatch(span -> span.getName().equals("subspan test" + currentI))).isTrue();
assertChildSpan(spans, "testSpan test" + currentI, "subspan test" + currentI);
}
}

private static void assertChildSpan(final List<SpanData> spans, final String parentSpanName,
final String childSpanName1) {
assertThat(spans.stream()
.filter(span -> span.getName().equals(childSpanName1))
.findAny()
.orElseThrow()
.getParentSpanId()).isEqualTo(
spans.stream().filter(span -> span.getName().equals(parentSpanName)).findAny().get().getSpanId());
}

private static Stream<Arguments> generateContextRunners() {
return Stream.of(
Arguments.of("WITHOUT_CONTEXT", "Without Context"),
Arguments.of("ROOT_CONTEXT", "On Root Context"),
Arguments.of("DUPLICATED_CONTEXT", "On Duplicated Context"));
}

private void runOnContext(final Runnable runnable, final Vertx vertx, final String contextType) {
switch (contextType) {
case "WITHOUT_CONTEXT":
runWithoutContext(runnable);
break;
case "ROOT_CONTEXT":
runOnRootContext(runnable, vertx);
break;
case "DUPLICATED_CONTEXT":
runOnDuplicatedContext(runnable, vertx);
break;
default:
throw new IllegalArgumentException("Unknown context type: " + contextType);
}
}

private static void runWithoutContext(final Runnable runnable) {
assertThat(QuarkusContextStorage.getVertxContext()).isNull();
runnable.run();
}

private static void runOnRootContext(final Runnable runnable, final Vertx vertx) {
final Context rootContext = VertxContext.getRootContext(vertx.getOrCreateContext());
assertThat(rootContext).isNotNull();
assertThat(VertxContext.isDuplicatedContext(rootContext)).isFalse();
assertThat(rootContext).isNotEqualTo(QuarkusContextStorage.getVertxContext());

rootContext.runOnContext(v -> runnable.run());
}

private static void runOnDuplicatedContext(final Runnable runnable, final Vertx vertx) {
final Context duplicatedContext = VertxContext.createNewDuplicatedContext(vertx.getOrCreateContext());
assertThat(duplicatedContext).isNotNull();
assertThat(VertxContext.isDuplicatedContext(duplicatedContext)).isTrue();

duplicatedContext.runOnContext(v -> runnable.run());
}

}
Loading

0 comments on commit 1e499cf

Please sign in to comment.