Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate context to lettuce callbacks #3839

Merged
merged 1 commit into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.lettuce.v4_0;

import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;

import com.lambdaworks.redis.protocol.AsyncCommand;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.InstrumentationContext;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class LettuceAsyncCommandInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("com.lambdaworks.redis.protocol.AsyncCommand");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor(), LettuceAsyncCommandInstrumentation.class.getName() + "$SaveContextAdvice");
transformer.applyAdviceToMethod(
named("complete").or(named("completeExceptionally")).or(named("cancel")),
LettuceAsyncCommandInstrumentation.class.getName() + "$RestoreContextAdvice");
}

@SuppressWarnings("unused")
public static class SaveContextAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void saveContext(@Advice.This AsyncCommand<?, ?, ?> asyncCommand) {
Context context = Java8BytecodeBridge.currentContext();
// get the context that submitted this command and attach it, it will be used to run callbacks
context = context.get(LettuceSingletons.COMMAND_CONTEXT_KEY);
InstrumentationContext.get(AsyncCommand.class, Context.class).put(asyncCommand, context);
}
}

@SuppressWarnings("unused")
public static class RestoreContextAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This AsyncCommand<?, ?, ?> asyncCommand, @Advice.Local("otelScope") Scope scope) {
Context context =
InstrumentationContext.get(AsyncCommand.class, Context.class).get(asyncCommand);
scope = context.makeCurrent();
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(@Advice.Local("otelScope") Scope scope) {
scope.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ public static void onEnter(
@Advice.Argument(0) RedisCommand<?, ?, ?> command,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
context = instrumenter().start(currentContext(), command);
Context parentContext = currentContext();
context = instrumenter().start(parentContext, command);
// remember the context that called dispatch, it is used in LettuceAsyncCommandInstrumentation
context = context.with(LettuceSingletons.COMMAND_CONTEXT_KEY, parentContext);
scope = context.makeCurrent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public LettuceInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(new LettuceConnectInstrumentation(), new LettuceAsyncCommandsInstrumentation());
return asList(
new LettuceAsyncCommandInstrumentation(),
new LettuceAsyncCommandsInstrumentation(),
new LettuceConnectInstrumentation());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.protocol.RedisCommand;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
Expand All @@ -19,9 +21,11 @@ public final class LettuceSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.lettuce-4.0";

private static final Instrumenter<RedisCommand<?, ?, ?>, Void> INSTRUMENTER;

private static final Instrumenter<RedisURI, Void> CONNECT_INSTRUMENTER;

public static final ContextKey<Context> COMMAND_CONTEXT_KEY =
ContextKey.named("opentelemetry-lettuce-v4_0-context-key");

static {
DbAttributesExtractor<RedisCommand<?, ?, ?>, Void> attributesExtractor =
new LettuceDbAttributesExtractor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/

import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR

import com.lambdaworks.redis.ClientOptions
Expand Down Expand Up @@ -180,29 +181,44 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
Consumer<String> consumer = new Consumer<String>() {
@Override
void accept(String res) {
conds.evaluate {
assert res == "TESTVAL"
runWithSpan("callback") {
conds.evaluate {
assert res == "TESTVAL"
}
}
}
}

when:
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY")
redisFuture.thenAccept(consumer)
runWithSpan("parent") {
RedisFuture<String> redisFuture = asyncCommands.get("TESTKEY")
redisFuture.thenAccept(consumer)
}

then:
conds.await()
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GET"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_OPERATION.key}" "GET"
"${SemanticAttributes.DB_STATEMENT.key}" "GET"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
Expand All @@ -216,40 +232,62 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
BiFunction<String, Throwable, String> firstStage = new BiFunction<String, Throwable, String>() {
@Override
String apply(String res, Throwable throwable) {
conds.evaluate {
assert res == null
assert throwable == null
runWithSpan("callback1") {
conds.evaluate {
assert res == null
assert throwable == null
}
}
return (res == null ? successStr : res)
}
}
Function<String, Object> secondStage = new Function<String, Object>() {
@Override
Object apply(String input) {
conds.evaluate {
assert input == successStr
runWithSpan("callback2") {
conds.evaluate {
assert input == successStr
}
}
return null
}
}

when:
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY")
redisFuture.handleAsync(firstStage).thenApply(secondStage)
runWithSpan("parent") {
RedisFuture<String> redisFuture = asyncCommands.get("NON_EXISTENT_KEY")
redisFuture.handle(firstStage).thenApply(secondStage)
}

then:
conds.await()
assertTraces(1) {
trace(0, 1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "GET"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_OPERATION.key}" "GET"
"${SemanticAttributes.DB_STATEMENT.key}" "GET"
}
}
span(2) {
name "callback1"
kind INTERNAL
childOf(span(0))
}
span(3) {
name "callback2"
kind INTERNAL
childOf(span(0))
}
}
}
}
Expand All @@ -260,29 +298,44 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() {
@Override
void accept(String keyRetrieved, Throwable throwable) {
conds.evaluate {
assert keyRetrieved != null
runWithSpan("callback") {
conds.evaluate {
assert keyRetrieved != null
}
}
}
}

when:
RedisFuture<String> redisFuture = asyncCommands.randomkey()
redisFuture.whenCompleteAsync(biConsumer)
runWithSpan("parent") {
RedisFuture<String> redisFuture = asyncCommands.randomkey()
redisFuture.whenCompleteAsync(biConsumer)
}

then:
conds.await()
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "RANDOMKEY"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_OPERATION.key}" "RANDOMKEY"
"${SemanticAttributes.DB_STATEMENT.key}" "RANDOMKEY"
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
Expand Down Expand Up @@ -397,12 +450,16 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
setup:
asyncCommands.setAutoFlushCommands(false)
def conds = new AsyncConditions()
RedisFuture redisFuture = asyncCommands.sadd("SKEY", "1", "2")
RedisFuture redisFuture = runWithSpan("parent") {
asyncCommands.sadd("SKEY", "1", "2")
}
redisFuture.whenCompleteAsync({
res, throwable ->
conds.evaluate {
assert throwable != null
assert throwable instanceof CancellationException
runWithSpan("callback") {
conds.evaluate {
assert throwable != null
assert throwable instanceof CancellationException
}
}
})

Expand All @@ -414,17 +471,28 @@ class LettuceAsyncClientTest extends AgentInstrumentationSpecification {
conds.await()
cancelSuccess == true
assertTraces(1) {
trace(0, 1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
span(1) {
name "SADD"
kind CLIENT
childOf(span(0))
attributes {
"${SemanticAttributes.DB_SYSTEM.key}" "redis"
"${SemanticAttributes.DB_OPERATION.key}" "SADD"
"${SemanticAttributes.DB_STATEMENT.key}" "SADD"
"lettuce.command.cancelled" true
}
}
span(2) {
name "callback"
kind INTERNAL
childOf(span(0))
}
}
}
}
Expand Down
Loading