diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index c7a5d45bff..6794847fb9 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -90,8 +90,6 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom private final boolean tracingEnabled; - private final boolean includeCommandArgsInSpanTags; - private final float discardReadBytesRatio; private final boolean boundedQueues; @@ -138,7 +136,6 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc Tracing tracing = clientResources.tracing(); this.tracingEnabled = tracing.isEnabled(); - this.includeCommandArgsInSpanTags = tracing.includeCommandArgsInSpanTags(); float bufferUsageRatio = clientOptions.getBufferUsageRatio(); this.discardReadBytesRatio = bufferUsageRatio / (bufferUsageRatio + 1); @@ -402,35 +399,11 @@ private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand TraceContext context = provider.getTraceContext(); Tracer.Span span = tracer.nextSpan(context); - span.name(command.getType().name()); - - if (includeCommandArgsInSpanTags && command.getArgs() != null) { - span.tag("redis.args", command.getArgs().toCommandString()); - } - - span.remoteEndpoint(tracedEndpoint); - span.start(); + span.remoteEndpoint(tracedEndpoint).start(command); if (traced != null) { traced.setSpan(span); } - - CompleteableCommand completeableCommand = (CompleteableCommand) command; - completeableCommand.onComplete((o, throwable) -> { - - if (command.getOutput() != null) { - - String error = command.getOutput().getError(); - if (error != null) { - span.tag("error", error); - } else if (throwable != null) { - span.tag("exception", throwable.toString()); - span.error(throwable); - } - } - - span.finish(); - }); } ctx.write(command, promise); diff --git a/src/main/java/io/lettuce/core/tracing/BraveTracing.java b/src/main/java/io/lettuce/core/tracing/BraveTracing.java index dabeced5a7..7ca69df7a1 100644 --- a/src/main/java/io/lettuce/core/tracing/BraveTracing.java +++ b/src/main/java/io/lettuce/core/tracing/BraveTracing.java @@ -17,12 +17,15 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.function.BiConsumer; import java.util.function.Consumer; import reactor.core.publisher.Mono; import brave.Span; import brave.propagation.TraceContextOrSamplingFlags; import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.protocol.CompleteableCommand; +import io.lettuce.core.protocol.RedisCommand; /** * {@link Tracing} integration with OpenZipkin's Brave {@link brave.Tracer}. This implementation creates Brave @@ -71,7 +74,7 @@ private BraveTracing(Builder builder) { LettuceAssert.notNull(builder.serviceName, "Service name must not be null"); this.tracingOptions = new BraveTracingOptions(builder.serviceName, builder.endpointCustomizer, builder.spanCustomizer); - this.tracer = new BraveTracer(builder.tracing, this.tracingOptions); + this.tracer = new BraveTracer(builder.tracing, this.tracingOptions, builder.includeCommandArgsInSpanTags); this.includeCommandArgsInSpanTags = builder.includeCommandArgsInSpanTags; } @@ -109,7 +112,7 @@ public static class Builder { private Consumer endpointCustomizer = it -> { }; - private Consumer spanCustomizer = it -> { + private BiConsumer, Span> spanCustomizer = (command, span) -> { }; private boolean includeCommandArgsInSpanTags = true; @@ -193,6 +196,22 @@ public Builder spanCustomizer(Consumer spanCustomizer) { LettuceAssert.notNull(spanCustomizer, "Span customizer must not be null!"); + this.spanCustomizer = (command, span) -> spanCustomizer.accept(span); + return this; + } + + /** + * Sets an {@link brave.Span} customizer to customize the {@link brave.Span} based on the underlying + * {@link RedisCommand}. The customizer is invoked before {@link Span#finish()} finishing} the span. + * + * @param spanCustomizer must not be {@code null}. + * @return {@code this} {@link Builder}. + * @since 6.0 + */ + public Builder spanCustomizer(BiConsumer, brave.Span> spanCustomizer) { + + LettuceAssert.notNull(spanCustomizer, "Span customizer must not be null!"); + this.spanCustomizer = spanCustomizer; return this; } @@ -257,9 +276,12 @@ static class BraveTracer extends Tracer { private final BraveTracingOptions tracingOptions; - BraveTracer(brave.Tracing tracing, BraveTracingOptions tracingOptions) { + private final boolean includeCommandArgsInSpanTags; + + BraveTracer(brave.Tracing tracing, BraveTracingOptions tracingOptions, boolean includeCommandArgsInSpanTags) { this.tracing = tracing; this.tracingOptions = tracingOptions; + this.includeCommandArgsInSpanTags = includeCommandArgsInSpanTags; } @Override @@ -290,7 +312,7 @@ private Span postProcessSpan(brave.Span span) { return NoOpTracing.NoOpSpan.INSTANCE; } - return new BraveSpan(span.kind(brave.Span.Kind.CLIENT), this.tracingOptions); + return new BraveSpan(span.kind(brave.Span.Kind.CLIENT), this.tracingOptions, includeCommandArgsInSpanTags); } } @@ -304,15 +326,47 @@ static class BraveSpan extends Tracer.Span { private final BraveTracingOptions tracingOptions; - BraveSpan(Span span, BraveTracingOptions tracingOptions) { + private final boolean includeCommandArgsInSpanTags; + + BraveSpan(Span span, BraveTracingOptions tracingOptions, boolean includeCommandArgsInSpanTags) { this.span = span; this.tracingOptions = tracingOptions; + this.includeCommandArgsInSpanTags = includeCommandArgsInSpanTags; } @Override - public BraveSpan start() { + public BraveSpan start(RedisCommand command) { + + span.name(command.getType().name()); + + if (includeCommandArgsInSpanTags && command.getArgs() != null) { + span.tag("redis.args", command.getArgs().toCommandString()); + } + + if (command instanceof CompleteableCommand) { + CompleteableCommand completeableCommand = (CompleteableCommand) command; + completeableCommand.onComplete((o, throwable) -> { + + if (command.getOutput() != null) { + + String error = command.getOutput().getError(); + if (error != null) { + span.tag("error", error); + } else if (throwable != null) { + span.tag("exception", throwable.toString()); + span.error(throwable); + } + } + + span.finish(); + }); + } else { + throw new IllegalArgumentException("Command " + command + + " must implement CompleteableCommand to attach Span completion to command completion"); + } span.start(); + this.tracingOptions.customizeSpan(command, span); return this; } @@ -352,7 +406,14 @@ public BraveSpan error(Throwable throwable) { @Override public BraveSpan remoteEndpoint(Endpoint endpoint) { - span.remoteEndpoint(BraveEndpoint.class.cast(endpoint).endpoint); + zipkin2.Endpoint zkEndpoint = BraveEndpoint.class.cast(endpoint).endpoint; + + if (zkEndpoint.serviceName() != null) { + span.remoteServiceName(zkEndpoint.serviceName()); + } + + String ip = zkEndpoint.ipv6() != null ? zkEndpoint.ipv6() : zkEndpoint.ipv4(); + span.remoteIpAndPort(ip, zkEndpoint.portAsInt()); return this; } @@ -360,7 +421,6 @@ public BraveSpan remoteEndpoint(Endpoint endpoint) { @Override public void finish() { - this.tracingOptions.customizeSpan(span); span.finish(); } @@ -448,10 +508,10 @@ static class BraveTracingOptions { private final Consumer endpointCustomizer; - private final Consumer spanCustomizer; + private final BiConsumer, brave.Span> spanCustomizer; BraveTracingOptions(String serviceName, Consumer endpointCustomizer, - Consumer spanCustomizer) { + BiConsumer, brave.Span> spanCustomizer) { this.serviceName = serviceName; this.endpointCustomizer = endpointCustomizer; this.spanCustomizer = spanCustomizer; @@ -461,8 +521,9 @@ void customizeEndpoint(zipkin2.Endpoint.Builder builder) { this.endpointCustomizer.accept(builder); } - void customizeSpan(brave.Span span) { - this.spanCustomizer.accept(span); + @SuppressWarnings("unchecked") + void customizeSpan(RedisCommand command, brave.Span span) { + this.spanCustomizer.accept((RedisCommand) command, span); } } diff --git a/src/main/java/io/lettuce/core/tracing/NoOpTracing.java b/src/main/java/io/lettuce/core/tracing/NoOpTracing.java index 3c36e98213..1a165ffcbf 100644 --- a/src/main/java/io/lettuce/core/tracing/NoOpTracing.java +++ b/src/main/java/io/lettuce/core/tracing/NoOpTracing.java @@ -15,6 +15,8 @@ */ package io.lettuce.core.tracing; +import io.lettuce.core.protocol.RedisCommand; + import java.net.SocketAddress; /** @@ -87,7 +89,7 @@ public static class NoOpSpan extends Tracer.Span { static final NoOpSpan INSTANCE = new NoOpSpan(); @Override - public Tracer.Span start() { + public Tracer.Span start(RedisCommand command) { return this; } diff --git a/src/main/java/io/lettuce/core/tracing/Tracer.java b/src/main/java/io/lettuce/core/tracing/Tracer.java index 26f5a0a656..5dc48bda7f 100644 --- a/src/main/java/io/lettuce/core/tracing/Tracer.java +++ b/src/main/java/io/lettuce/core/tracing/Tracer.java @@ -15,6 +15,8 @@ */ package io.lettuce.core.tracing; +import io.lettuce.core.protocol.RedisCommand; + /** * Tracing abstraction to create {@link Span}s to capture latency and behavior of Redis commands. * @@ -48,9 +50,10 @@ public abstract static class Span { /** * Starts the span with. * + * @param command the underlying command. * @return {@literal this} {@link Span}. */ - public abstract Span start(); + public abstract Span start(RedisCommand command); /** * Sets the name for this {@link Span}. diff --git a/src/test/java/io/lettuce/core/tracing/BraveTracingUnitTests.java b/src/test/java/io/lettuce/core/tracing/BraveTracingUnitTests.java index ae2e6448d4..99bbc3f512 100644 --- a/src/test/java/io/lettuce/core/tracing/BraveTracingUnitTests.java +++ b/src/test/java/io/lettuce/core/tracing/BraveTracingUnitTests.java @@ -17,6 +17,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.List; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; @@ -26,15 +27,21 @@ import org.junit.jupiter.api.Test; import org.springframework.test.util.ReflectionTestUtils; +import io.lettuce.core.protocol.AsyncCommand; import zipkin2.Span; +import brave.Tag; import brave.Tracer; import brave.Tracing; import brave.handler.MutableSpan; import brave.propagation.CurrentTraceContext; import io.lettuce.core.TestSupport; +import io.lettuce.core.protocol.Command; +import io.lettuce.core.protocol.CommandType; import io.netty.channel.unix.DomainSocketAddress; /** + * Unit tests for {@link BraveTracing}. + * * @author Mark Paluch * @author Daniel Albuquerque */ @@ -109,13 +116,14 @@ void shouldCustomizeEndpoint() { void shouldCustomizeSpan() { BraveTracing tracing = BraveTracing.builder().tracing(clientTracing) - .spanCustomizer(it -> it.remoteServiceName("remote")).build(); + .spanCustomizer((command, span) -> span.tag("cmd", command.getType().name())).build(); BraveTracing.BraveSpan span = (BraveTracing.BraveSpan) tracing.getTracerProvider().getTracer().nextSpan(); - span.finish(); + span.start(new AsyncCommand<>(new Command<>(CommandType.AUTH, null))); MutableSpan braveSpan = (MutableSpan) ReflectionTestUtils.getField(span.getSpan(), "state"); + List tags = (List) ReflectionTestUtils.getField(braveSpan, "tags"); - assertThat(braveSpan.remoteServiceName()).isEqualTo("remote"); + assertThat(tags).contains("cmd", "AUTH"); } }