From abbfdf69f1a71832fa2d4d90a2e010a876f3a9a6 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Sat, 28 Dec 2019 16:36:17 +0100 Subject: [PATCH] Refactor connection activation to ConnectionInitializer #697 Connections are now initialized with a ConnectionInitializer that performs the Handshake and issues initialization commands after the handshake. The ConnectionState keeps track of the desired state and applies it on connect and reconnect. This change removes the need to track the state of each connection flag in each connection implementation and moves all flags into a single state object. The initialization/handshake future is now also provided by RedisHandshakeHandler instead of living in various initializer types. The handshake for SSL and plaintext connections became simpler and easier to find. Centralizing the handshake is a pre-requisite for protocol version negotiation. --- .../io/lettuce/core/AbstractRedisClient.java | 23 ++- .../io/lettuce/core/ChannelGroupListener.java | 15 +- .../io/lettuce/core/ConnectionBuilder.java | 134 ++++---------- .../lettuce/core/ConnectionEventTrigger.java | 2 + .../io/lettuce/core/ConnectionEvents.java | 24 --- .../java/io/lettuce/core/ConnectionState.java | 173 +++++++++++++++++ .../lettuce/core/PlainChannelInitializer.java | 175 ------------------ .../lettuce/core/RedisChannelInitializer.java | 34 ---- .../core/RedisChannelInitializerImpl.java | 26 --- .../java/io/lettuce/core/RedisClient.java | 79 ++++---- .../io/lettuce/core/RedisCommandBuilder.java | 3 +- .../io/lettuce/core/SslConnectionBuilder.java | 129 +------------ .../core/StatefulRedisConnectionImpl.java | 52 ++---- .../{RedisState.java => CommandSet.java} | 6 +- ...RedisAdvancedClusterAsyncCommandsImpl.java | 4 +- ...isAdvancedClusterReactiveCommandsImpl.java | 4 +- .../core/cluster/RedisClusterClient.java | 84 +++++---- .../RedisClusterPubSubAsyncCommandsImpl.java | 8 +- ...edisClusterPubSubReactiveCommandsImpl.java | 8 +- .../StatefulRedisClusterConnectionImpl.java | 78 +++----- ...tefulRedisClusterPubSubConnectionImpl.java | 10 +- .../lettuce/core/protocol/CommandHandler.java | 36 ++-- .../core/protocol/ConnectionInitializer.java | 38 ++++ .../core/protocol/ConnectionWatchdog.java | 15 +- .../core/protocol/ReconnectionHandler.java | 131 ++++--------- .../core/protocol/RedisHandshakeHandler.java | 157 ++++++++++++++++ .../protocol/SslRedisHandshakeHandler.java | 58 ++++++ .../StatefulRedisSentinelConnectionImpl.java | 27 ++- .../ConnectionCommandIntegrationTests.java | 18 +- ...s.java => CommandSetIntegrationTests.java} | 7 +- .../RedisClusterClientIntegrationTests.java | 5 +- .../protocol/CommandHandlerUnitTests.java | 10 - .../pubsub/PubSubCommandHandlerUnitTests.java | 2 +- .../tracing/BraveTracingIntegrationTests.java | 46 ++--- 34 files changed, 740 insertions(+), 881 deletions(-) create mode 100644 src/main/java/io/lettuce/core/ConnectionState.java delete mode 100644 src/main/java/io/lettuce/core/PlainChannelInitializer.java delete mode 100644 src/main/java/io/lettuce/core/RedisChannelInitializer.java delete mode 100644 src/main/java/io/lettuce/core/RedisChannelInitializerImpl.java rename src/main/java/io/lettuce/core/cluster/{RedisState.java => CommandSet.java} (94%) create mode 100644 src/main/java/io/lettuce/core/protocol/ConnectionInitializer.java create mode 100644 src/main/java/io/lettuce/core/protocol/RedisHandshakeHandler.java create mode 100644 src/main/java/io/lettuce/core/protocol/SslRedisHandshakeHandler.java rename src/test/java/io/lettuce/core/cluster/{RedisStateIntegrationTests.java => CommandSetIntegrationTests.java} (91%) diff --git a/src/main/java/io/lettuce/core/AbstractRedisClient.java b/src/main/java/io/lettuce/core/AbstractRedisClient.java index a456487905..538ceedcd4 100644 --- a/src/main/java/io/lettuce/core/AbstractRedisClient.java +++ b/src/main/java/io/lettuce/core/AbstractRedisClient.java @@ -15,8 +15,6 @@ */ package io.lettuce.core; -import static java.util.concurrent.CompletableFuture.completedFuture; - import java.io.Closeable; import java.net.SocketAddress; import java.time.Duration; @@ -33,6 +31,7 @@ import io.lettuce.core.internal.Futures; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.protocol.ConnectionWatchdog; +import io.lettuce.core.protocol.RedisHandshakeHandler; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; import io.netty.bootstrap.Bootstrap; @@ -166,10 +165,6 @@ protected void connectionBuilder(Mono socketAddressSupplier, Conn connectionBuilder.socketAddressSupplier(socketAddressSupplier); } - private boolean hasPassword(RedisURI connectionSettings) { - return connectionSettings.getPassword() != null && connectionSettings.getPassword().length != 0; - } - protected void channelType(ConnectionBuilder connectionBuilder, ConnectionPoint connectionPoint) { LettuceAssert.notNull(connectionPoint, "ConnectionPoint must not be null"); @@ -309,18 +304,16 @@ private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, Comple Bootstrap redisBootstrap = connectionBuilder.bootstrap(); - RedisChannelInitializer initializer = connectionBuilder.build(); + ChannelInitializer initializer = connectionBuilder.build(); redisBootstrap.handler(initializer); clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap); - CompletableFuture initFuture = initializer.channelInitialized(); ChannelFuture connectFuture = redisBootstrap.connect(redisAddress); channelReadyFuture.whenComplete((c, t) -> { if (t instanceof CancellationException) { connectFuture.cancel(true); - initFuture.cancel(true); } }); @@ -334,7 +327,14 @@ private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, Comple return; } - initFuture.whenComplete((success, throwable) -> { + RedisHandshakeHandler handshakeHandler = connectFuture.channel().pipeline().get(RedisHandshakeHandler.class); + + if (handshakeHandler == null) { + channelReadyFuture.completeExceptionally(new IllegalStateException("RedisHandshakeHandler not registered")); + return; + } + + handshakeHandler.channelInitialized().whenComplete((success, throwable) -> { if (throwable == null) { @@ -432,7 +432,6 @@ public CompletableFuture shutdownAsync() { * @param timeUnit the unit of {@code quietPeriod} and {@code timeout} * @since 4.4 */ - @SuppressWarnings("rawtypes") public CompletableFuture shutdownAsync(long quietPeriod, long timeout, TimeUnit timeUnit) { if (shutdown.compareAndSet(false, true)) { @@ -441,7 +440,7 @@ public CompletableFuture shutdownAsync(long quietPeriod, long timeout, Tim return closeResources().thenCompose((value) -> closeClientResources(quietPeriod, timeout, timeUnit)); } - return completedFuture(null); + return CompletableFuture.completedFuture(null); } private CompletableFuture closeResources() { diff --git a/src/main/java/io/lettuce/core/ChannelGroupListener.java b/src/main/java/io/lettuce/core/ChannelGroupListener.java index 84236302b5..f95c094f51 100644 --- a/src/main/java/io/lettuce/core/ChannelGroupListener.java +++ b/src/main/java/io/lettuce/core/ChannelGroupListener.java @@ -15,6 +15,12 @@ */ package io.lettuce.core; +import static io.lettuce.core.ConnectionEventTrigger.local; +import static io.lettuce.core.ConnectionEventTrigger.remote; + +import io.lettuce.core.event.EventBus; +import io.lettuce.core.event.connection.ConnectedEvent; +import io.lettuce.core.event.connection.DisconnectedEvent; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -25,23 +31,28 @@ * ChannelGroup. * * @author Will Glozer + * @author Mark Paluch */ class ChannelGroupListener extends ChannelInboundHandlerAdapter { - private ChannelGroup channels; + private final ChannelGroup channels; + private final EventBus eventBus; - public ChannelGroupListener(ChannelGroup channels) { + public ChannelGroupListener(ChannelGroup channels, EventBus eventBus) { this.channels = channels; + this.eventBus = eventBus; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { + eventBus.publish(new ConnectedEvent(local(ctx), remote(ctx))); channels.add(ctx.channel()); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + eventBus.publish(new DisconnectedEvent(local(ctx), remote(ctx))); channels.remove(ctx.channel()); super.channelInactive(ctx); } diff --git a/src/main/java/io/lettuce/core/ConnectionBuilder.java b/src/main/java/io/lettuce/core/ConnectionBuilder.java index c6950d1ebf..836e6d3692 100644 --- a/src/main/java/io/lettuce/core/ConnectionBuilder.java +++ b/src/main/java/io/lettuce/core/ConnectionBuilder.java @@ -16,21 +16,19 @@ package io.lettuce.core; import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.Charset; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; import reactor.core.publisher.Mono; -import io.lettuce.core.codec.StringCodec; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.protocol.*; import io.lettuce.core.resource.ClientResources; import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; import io.netty.channel.group.ChannelGroup; import io.netty.util.Timer; @@ -41,13 +39,6 @@ */ public class ConnectionBuilder { - private static final RedisCommandBuilder INITIALIZING_CMD_BUILDER = new RedisCommandBuilder<>( - StringCodec.UTF8); - - private static final Supplier> PING = () -> new AsyncCommand<>(INITIALIZING_CMD_BUILDER.ping()); - - private static final Supplier> NO_PING = () -> null; - private Mono socketAddressSupplier; private ConnectionEvents connectionEvents; private RedisChannelHandler connection; @@ -59,57 +50,21 @@ public class ConnectionBuilder { private ClientOptions clientOptions; private Duration timeout; private ClientResources clientResources; - private String username; - private char[] password; - private String clientName; + private ConnectionInitializer connectionInitializer; private ReconnectionListener reconnectionListener = ReconnectionListener.NO_OP; private ConnectionWatchdog connectionWatchdog; - private Supplier> handshakeCommandSupplier = ConnectionBuilder.PING; public static ConnectionBuilder connectionBuilder() { return new ConnectionBuilder(); } - /** - * @param handshakeCommandSupplier - * @return {@literal true} whether {@code PING}/{@code HELLO} handshake is enabled. - */ - static boolean isHandshakeEnabled(Supplier> handshakeCommandSupplier) { - return handshakeCommandSupplier != NO_PING; - } - /** * Apply settings from {@link RedisURI} * * @param redisURI */ public void apply(RedisURI redisURI) { - timeout(redisURI.getTimeout()); - clientName(redisURI.getClientName()); - - if (clientOptions.getProtocolVersion() == ProtocolVersion.RESP2) { - - pingBeforeConnect(clientOptions.isPingBeforeActivateConnection()); - - if (clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) { - auth(redisURI.getPassword()); - handshakeAuthResp2(); - } - } else if (clientOptions.getProtocolVersion() == ProtocolVersion.RESP3) { - - if (hasPassword(redisURI)) { - if (LettuceStrings.isNotEmpty(redisURI.getUsername())) { - auth(redisURI.getUsername(), redisURI.getPassword()); - } else { - auth("default", redisURI.getPassword()); - } - handshakeAuthResp3(); - - } else { - handshakeResp3(); - } - } } protected List buildHandlers() { @@ -119,13 +74,15 @@ protected List buildHandlers() { LettuceAssert.assertState(connection != null, "Connection must be set"); LettuceAssert.assertState(clientResources != null, "ClientResources must be set"); LettuceAssert.assertState(endpoint != null, "Endpoint must be set"); + LettuceAssert.assertState(connectionInitializer != null, "ConnectionInitializer must be set"); List handlers = new ArrayList<>(); connection.setOptions(clientOptions); - handlers.add(new ChannelGroupListener(channelGroup)); + handlers.add(new ChannelGroupListener(channelGroup, clientResources.eventBus())); handlers.add(new CommandEncoder()); + handlers.add(getHandshakeHandler()); handlers.add(commandHandlerSupplier.get()); handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus())); @@ -137,22 +94,8 @@ protected List buildHandlers() { return handlers; } - void pingBeforeConnect(boolean state) { - handshakeCommandSupplier = state ? PING : NO_PING; - } - - void handshakeAuthResp2() { - handshakeCommandSupplier = () -> new AsyncCommand<>(INITIALIZING_CMD_BUILDER.auth(new String(password))); - } - - void handshakeAuthResp3() { - handshakeCommandSupplier = () -> new AsyncCommand<>(INITIALIZING_CMD_BUILDER.hello(3, this.username.getBytes(), - encode(this.password), this.clientName != null ? this.clientName.getBytes() : null)); - } - - void handshakeResp3() { - handshakeCommandSupplier = () -> new AsyncCommand<>( - INITIALIZING_CMD_BUILDER.hello(3, null, null, this.clientName != null ? this.clientName.getBytes() : null)); + protected ChannelHandler getHandshakeHandler() { + return new RedisHandshakeHandler(connectionInitializer, clientResources, timeout); } protected ConnectionWatchdog createConnectionWatchdog() { @@ -175,8 +118,8 @@ protected ConnectionWatchdog createConnectionWatchdog() { return watchdog; } - public RedisChannelInitializer build() { - return new PlainChannelInitializer(handshakeCommandSupplier, this::buildHandlers, clientResources, timeout); + public ChannelInitializer build() { + return new PlainChannelInitializer(this::buildHandlers, clientResources); } public ConnectionBuilder socketAddressSupplier(Mono socketAddressSupplier) { @@ -250,29 +193,8 @@ public ConnectionBuilder clientResources(ClientResources clientResources) { return this; } - public ConnectionBuilder clientName(String clientName) { - this.clientName = clientName; - return this; - } - - boolean hasPassword(RedisURI connectionSettings) { - return connectionSettings.getPassword() != null && connectionSettings.getPassword().length != 0; - } - - public ConnectionBuilder auth(String username, char[] password) { - this.username = username; - this.password = password; - return this; - } - - public ConnectionBuilder auth(char[] password) { - this.password = password; - return this; - } - - @Deprecated - public ConnectionBuilder password(char[] password) { - this.password = password; + public ConnectionBuilder connectionInitializer(ConnectionInitializer connectionInitializer) { + this.connectionInitializer = connectionInitializer; return this; } @@ -292,24 +214,32 @@ public ClientResources clientResources() { return clientResources; } - public char[] password() { - return password; - } - public Endpoint endpoint() { return endpoint; } - Supplier> getHandshakeCommandSupplier() { - return handshakeCommandSupplier; - } + static class PlainChannelInitializer extends ChannelInitializer { + + private final Supplier> handlers; + private final ClientResources clientResources; + + PlainChannelInitializer(Supplier> handlers, ClientResources clientResources) { + this.handlers = handlers; + this.clientResources = clientResources; + } - static byte[] encode(char[] chars) { + @Override + protected void initChannel(Channel channel) { + doInitialize(channel); + } - ByteBuffer encoded = Charset.defaultCharset().encode(CharBuffer.wrap(chars)); - byte[] bytes = new byte[encoded.remaining()]; - encoded.get(bytes); + private void doInitialize(Channel channel) { - return bytes; + for (ChannelHandler handler : handlers.get()) { + channel.pipeline().addLast(handler); + } + + clientResources.nettyCustomizer().afterChannelInitialized(channel); + } } } diff --git a/src/main/java/io/lettuce/core/ConnectionEventTrigger.java b/src/main/java/io/lettuce/core/ConnectionEventTrigger.java index 2a38b82abb..6bdd41fc2a 100644 --- a/src/main/java/io/lettuce/core/ConnectionEventTrigger.java +++ b/src/main/java/io/lettuce/core/ConnectionEventTrigger.java @@ -18,6 +18,7 @@ import java.net.SocketAddress; import io.lettuce.core.event.EventBus; +import io.lettuce.core.event.connection.ConnectionActivatedEvent; import io.lettuce.core.event.connection.ConnectionDeactivatedEvent; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; @@ -43,6 +44,7 @@ class ConnectionEventTrigger extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { connectionEvents.fireEventRedisConnected(connection, ctx.channel().remoteAddress()); + eventBus.publish(new ConnectionActivatedEvent(local(ctx), remote(ctx))); super.channelActive(ctx); } diff --git a/src/main/java/io/lettuce/core/ConnectionEvents.java b/src/main/java/io/lettuce/core/ConnectionEvents.java index e0356db7ba..53f8b5033e 100644 --- a/src/main/java/io/lettuce/core/ConnectionEvents.java +++ b/src/main/java/io/lettuce/core/ConnectionEvents.java @@ -19,8 +19,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import io.lettuce.core.protocol.RedisCommand; - /** * Close Events Facility. Can register/unregister CloseListener and fire a closed event to all registered listeners. This class * is part of the internal API and may change without further notice. @@ -64,28 +62,6 @@ public void removeListener(RedisConnectionStateListener listener) { public static class Reset { } - /** - * Internal event when a channel is activated. - */ - public static class Activated { - } - - /** - * Internal event when a channel is activated. - */ - public static class HandshakeEvent { - - private final RedisCommand command; - - public HandshakeEvent(RedisCommand command) { - this.command = command; - } - - public RedisCommand getCommand() { - return command; - } - } - /** * Internal event when a reconnect is initiated. */ diff --git a/src/main/java/io/lettuce/core/ConnectionState.java b/src/main/java/io/lettuce/core/ConnectionState.java new file mode 100644 index 0000000000..e2cb65b91a --- /dev/null +++ b/src/main/java/io/lettuce/core/ConnectionState.java @@ -0,0 +1,173 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import io.lettuce.core.codec.StringCodec; +import io.lettuce.core.protocol.*; +import io.netty.channel.Channel; + +/** + * Internal connection state representing the requested {@link ProtocolVersion} and other options for connection initialization + * and connection state restoration. This class is part of the internal API. + * + * @author Mark Paluch + * @since 6.0 + */ +public class ConnectionState implements ConnectionInitializer { + + private final RedisCommandBuilder commandBuilder = new RedisCommandBuilder<>(StringCodec.UTF8); + + private ProtocolVersion requested; + private boolean pingOnConnect; + + private volatile String username; + private volatile char[] password; + private volatile int db; + private volatile boolean readOnly; + private volatile String clientName; + + public void setRequestedProtocolVersion(ProtocolVersion requested) { + this.requested = requested; + } + + public void setPingOnConnect(boolean pingOnConnect) { + this.pingOnConnect = pingOnConnect; + } + + public void setUsername(String username) { + this.username = username; + } + + public void setPassword(char[] password) { + this.password = password; + } + + public void setDb(int db) { + this.db = db; + } + + private boolean hasPassword() { + return this.password != null && this.password.length > 0; + } + + public void setReadOnly(boolean readOnly) { + this.readOnly = readOnly; + } + + public void setClientName(String clientName) { + this.clientName = clientName; + } + + @Override + public CompletionStage initialize(Channel channel) { + + CompletableFuture handshake; + + List> postHandshake = new ArrayList<>(); + + if (this.requested == ProtocolVersion.RESP2) { + handshake = initiateHandshakeResp2(channel); + + if (this.clientName != null) { + postHandshake.add(this.commandBuilder.clientSetname(this.clientName)); + } + } else if (this.requested == ProtocolVersion.RESP3) { + handshake = initiateHandshakeResp3(channel); + } else { + handshake = new CompletableFuture<>(); + handshake.completeExceptionally( + new RedisConnectionException("Protocol version" + this.requested + " not supported")); + } + + if (this.db > 0) { + postHandshake.add(this.commandBuilder.select(this.db)); + } + + if (this.readOnly) { + postHandshake.add(this.commandBuilder.readOnly()); + } + + if (!postHandshake.isEmpty()) { + + List> commands = new ArrayList<>(); + + for (RedisCommand redisCommand : postHandshake) { + + AsyncCommand async = new AsyncCommand<>(redisCommand); + handshake = handshake.thenCompose(o -> async); + commands.add(async); + } + + channel.writeAndFlush(commands); + } + + return handshake.thenRun(() -> { + }); + } + + /** + * Perform a RESP2 Handshake: Issue a {@code PING} or {@code AUTH}. + * + * @param channel + * @return + */ + private CompletableFuture initiateHandshakeResp2(Channel channel) { + + if (hasPassword()) { + return dispatch(channel, this.commandBuilder.auth(this.password)); + } else if (this.pingOnConnect) { + return dispatch(channel, this.commandBuilder.ping()); + } + + return CompletableFuture.completedFuture(null); + } + + /** + * Perform a RESP3 Handshake: Issue a {@code HELLO}. + * + * @param channel + * @return + */ + private CompletableFuture> initiateHandshakeResp3(Channel channel) { + + if (hasPassword()) { + + return dispatch(channel, this.commandBuilder.hello(3, + LettuceStrings.isNotEmpty(this.username) ? this.username : "default", this.password, this.clientName)); + } + + return dispatch(channel, this.commandBuilder.hello(3, null, null, this.clientName)); + } + + private AsyncCommand dispatch(Channel channel, Command command) { + + AsyncCommand future = new AsyncCommand<>(command); + + channel.writeAndFlush(future).addListener(writeFuture -> { + + if (!writeFuture.isSuccess()) { + future.completeExceptionally(writeFuture.cause()); + } + }); + return future; + } +} diff --git a/src/main/java/io/lettuce/core/PlainChannelInitializer.java b/src/main/java/io/lettuce/core/PlainChannelInitializer.java deleted file mode 100644 index 5b0e742d1e..0000000000 --- a/src/main/java/io/lettuce/core/PlainChannelInitializer.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Copyright 2011-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.lettuce.core; - -import static io.lettuce.core.ConnectionEventTrigger.local; -import static io.lettuce.core.ConnectionEventTrigger.remote; - -import java.time.Duration; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -import io.lettuce.core.ConnectionEvents.HandshakeEvent; -import io.lettuce.core.event.connection.ConnectedEvent; -import io.lettuce.core.event.connection.ConnectionActivatedEvent; -import io.lettuce.core.event.connection.DisconnectedEvent; -import io.lettuce.core.protocol.AsyncCommand; -import io.lettuce.core.resource.ClientResources; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.util.Timeout; - -/** - * @author Mark Paluch - */ -class PlainChannelInitializer extends io.netty.channel.ChannelInitializer implements RedisChannelInitializer { - - private final Supplier> handlers; - private final Supplier> handshakeCommandSupplier; - private final ClientResources clientResources; - private final Duration timeout; - - private volatile CompletableFuture initializedFuture = new CompletableFuture<>(); - - PlainChannelInitializer(Supplier> handshakeCommandSupplier, Supplier> handlers, - ClientResources clientResources, Duration timeout) { - this.handshakeCommandSupplier = handshakeCommandSupplier; - this.handlers = handlers; - this.clientResources = clientResources; - this.timeout = timeout; - } - - @Override - protected void initChannel(Channel channel) throws Exception { - doInitialize(channel); - } - - private void doInitialize(Channel channel) { - - if (channel.pipeline().get("channelActivator") == null) { - - channel.pipeline().addLast("channelActivator", new RedisChannelInitializerImpl() { - - private AsyncCommand handshakeCommand; - - @Override - public CompletableFuture channelInitialized() { - return initializedFuture; - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - - clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx))); - - if (!initializedFuture.isDone()) { - initializedFuture.completeExceptionally(new RedisConnectionException("Connection closed prematurely")); - } - - initializedFuture = new CompletableFuture<>(); - handshakeCommand = null; - super.channelInactive(ctx); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - - if (evt instanceof ConnectionEvents.Activated) { - if (!initializedFuture.isDone()) { - initializedFuture.complete(true); - clientResources.eventBus().publish(new ConnectionActivatedEvent(local(ctx), remote(ctx))); - } - } - super.userEventTriggered(ctx, evt); - } - - @Override - public void channelActive(final ChannelHandlerContext ctx) throws Exception { - - clientResources.eventBus().publish(new ConnectedEvent(local(ctx), remote(ctx))); - - if (ConnectionBuilder.isHandshakeEnabled(handshakeCommandSupplier)) { - handshakeCommand = handshakeCommandSupplier.get(); - sendHandshake(handshakeCommand, initializedFuture, ctx, clientResources, timeout); - } else { - super.channelActive(ctx); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (!initializedFuture.isDone()) { - initializedFuture.completeExceptionally(cause); - } - super.exceptionCaught(ctx, cause); - } - }); - } - - for (ChannelHandler handler : handlers.get()) { - channel.pipeline().addLast(handler); - } - - clientResources.nettyCustomizer().afterChannelInitialized(channel); - } - - static void sendHandshake(AsyncCommand cmd, CompletableFuture initializedFuture, - ChannelHandlerContext ctx, ClientResources clientResources, Duration timeout) throws Exception { - - ctx.fireUserEventTriggered(new HandshakeEvent(cmd)); - - Runnable timeoutGuard = () -> { - - if (cmd.isDone() || initializedFuture.isDone()) { - return; - } - - initializedFuture.completeExceptionally( - ExceptionFactory.createTimeoutException(String.format("Cannot initialize channel (Handshake %s)", cmd.getType().name()), timeout)); - }; - - Timeout timeoutHandle = clientResources.timer().newTimeout(t -> { - - if (clientResources.eventExecutorGroup().isShuttingDown()) { - timeoutGuard.run(); - return; - } - - clientResources.eventExecutorGroup().submit(timeoutGuard); - }, timeout.toNanos(), TimeUnit.NANOSECONDS); - - cmd.whenComplete((o, throwable) -> { - - timeoutHandle.cancel(); - - if (throwable == null) { - ctx.fireChannelActive(); - initializedFuture.complete(true); - } else { - initializedFuture.completeExceptionally(throwable); - } - }); - } - - @Override - public CompletableFuture channelInitialized() { - return initializedFuture; - } - -} diff --git a/src/main/java/io/lettuce/core/RedisChannelInitializer.java b/src/main/java/io/lettuce/core/RedisChannelInitializer.java deleted file mode 100644 index be576b2cde..0000000000 --- a/src/main/java/io/lettuce/core/RedisChannelInitializer.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2011-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.lettuce.core; - -import java.util.concurrent.CompletableFuture; - -import io.netty.channel.ChannelHandler; - -/** - * Channel initializer to set up the transport before a Redis connection can be used. This is part of the internal API. This - * class is part of the internal API. - * - * @author Mark Paluch - */ -public interface RedisChannelInitializer extends ChannelHandler { - - /** - * @return future to synchronize channel initialization. Returns a new future for every reconnect. - */ - CompletableFuture channelInitialized(); -} diff --git a/src/main/java/io/lettuce/core/RedisChannelInitializerImpl.java b/src/main/java/io/lettuce/core/RedisChannelInitializerImpl.java deleted file mode 100644 index 96c743df56..0000000000 --- a/src/main/java/io/lettuce/core/RedisChannelInitializerImpl.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2011-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.lettuce.core; - -import io.netty.channel.ChannelDuplexHandler; - -/** - * Channel initializer to set up the transport before a Redis connection can be used. This class is part of the internal API. - * - * @author Mark Paluch - */ -abstract class RedisChannelInitializerImpl extends ChannelDuplexHandler implements RedisChannelInitializer { -} diff --git a/src/main/java/io/lettuce/core/RedisClient.java b/src/main/java/io/lettuce/core/RedisClient.java index 19cea5737c..2fa64f3509 100644 --- a/src/main/java/io/lettuce/core/RedisClient.java +++ b/src/main/java/io/lettuce/core/RedisClient.java @@ -35,8 +35,10 @@ import io.lettuce.core.codec.StringCodec; import io.lettuce.core.internal.Futures; import io.lettuce.core.internal.LettuceAssert; -import io.lettuce.core.output.StatusOutput; -import io.lettuce.core.protocol.*; +import io.lettuce.core.protocol.CommandExpiryWriter; +import io.lettuce.core.protocol.CommandHandler; +import io.lettuce.core.protocol.DefaultEndpoint; +import io.lettuce.core.protocol.Endpoint; import io.lettuce.core.pubsub.PubSubCommandHandler; import io.lettuce.core.pubsub.PubSubEndpoint; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; @@ -271,7 +273,7 @@ private ConnectionFuture> connectStandalone } StatefulRedisConnectionImpl connection = newStatefulRedisConnection(writer, codec, timeout); - ConnectionFuture> future = connectStatefulAsync(connection, codec, endpoint, redisURI, + ConnectionFuture> future = connectStatefulAsync(connection, endpoint, redisURI, () -> new CommandHandler(clientOptions, clientResources, endpoint)); future.whenComplete((channelHandler, throwable) -> { @@ -285,8 +287,8 @@ private ConnectionFuture> connectStandalone } @SuppressWarnings("unchecked") - private ConnectionFuture connectStatefulAsync(StatefulRedisConnectionImpl connection, - RedisCodec codec, Endpoint endpoint, RedisURI redisURI, Supplier commandHandlerSupplier) { + private ConnectionFuture connectStatefulAsync(StatefulRedisConnectionImpl connection, Endpoint endpoint, + RedisURI redisURI, Supplier commandHandlerSupplier) { ConnectionBuilder connectionBuilder; if (redisURI.isSsl()) { @@ -297,44 +299,23 @@ private ConnectionFuture connectStatefulAsync(StatefulRedisConnecti connectionBuilder = ConnectionBuilder.connectionBuilder(); } + ConnectionState state = connection.getConnectionState(); + + initializeConnectionState(redisURI, state); + state.setDb(redisURI.getDatabase()); + connectionBuilder.connection(connection); connectionBuilder.clientOptions(clientOptions); connectionBuilder.clientResources(clientResources); connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint); connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI); + connectionBuilder.connectionInitializer(state); channelType(connectionBuilder, redisURI); ConnectionFuture> future = initializeChannelAsync(connectionBuilder); - ConnectionFuture sync = future; - - if (clientOptions.getProtocolVersion() == ProtocolVersion.RESP2) { - - if (!clientOptions.isPingBeforeActivateConnection() && connectionBuilder.hasPassword(redisURI)) { - sync = sync.thenCompose(channelHandler -> { - - CommandArgs args = new CommandArgs<>(codec).add(redisURI.getPassword()); - return connection.async().dispatch(CommandType.AUTH, new StatusOutput<>(codec), args); - }); - } - - if (LettuceStrings.isNotEmpty(redisURI.getClientName())) { - sync = sync.thenApply(channelHandler -> { - connection.setClientName(redisURI.getClientName()); - return channelHandler; - }); - } - } - if (redisURI.getDatabase() != 0) { - - sync = sync.thenCompose(channelHandler -> { - CommandArgs args = new CommandArgs<>(codec).add(redisURI.getDatabase()); - return connection.async().dispatch(CommandType.SELECT, new StatusOutput<>(codec), args); - }); - } - - return sync.thenApply(channelHandler -> (S) connection); + return future.thenApply(channelHandler -> (S) connection); } /** @@ -422,8 +403,8 @@ private ConnectionFuture> connectPubS StatefulRedisPubSubConnectionImpl connection = newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout); - ConnectionFuture> future = connectStatefulAsync(connection, codec, endpoint, - redisURI, () -> new PubSubCommandHandler<>(clientOptions, clientResources, codec, endpoint)); + ConnectionFuture> future = connectStatefulAsync(connection, endpoint, redisURI, + () -> new PubSubCommandHandler<>(clientOptions, clientResources, codec, endpoint)); return future.whenComplete((conn, throwable) -> { @@ -516,7 +497,7 @@ private CompletableFuture> connectS logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels()); if (redisURI.getSentinels().isEmpty() && (isNotEmpty(redisURI.getHost()) || !isEmpty(redisURI.getSocket()))) { - return doConnectSentinelAsync(codec, redisURI.getClientName(), redisURI, timeout).toCompletableFuture(); + return doConnectSentinelAsync(codec, redisURI, timeout).toCompletableFuture(); } List sentinels = redisURI.getSentinels(); @@ -527,10 +508,8 @@ private CompletableFuture> connectS for (RedisURI uri : sentinels) { - String clientName = LettuceStrings.isNotEmpty(uri.getClientName()) ? uri.getClientName() : redisURI.getClientName(); - Mono> connectionMono = Mono - .fromCompletionStage(() -> doConnectSentinelAsync(codec, clientName, uri, timeout)) + .fromCompletionStage(() -> doConnectSentinelAsync(codec, uri, timeout)) .onErrorMap(CompletionException.class, Throwable::getCause) .onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, e)) .doOnError(exceptionCollector::add); @@ -565,7 +544,7 @@ private CompletableFuture> connectS } private ConnectionFuture> doConnectSentinelAsync(RedisCodec codec, - String clientName, RedisURI redisURI, Duration timeout) { + RedisURI redisURI, Duration timeout) { ConnectionBuilder connectionBuilder = ConnectionBuilder.connectionBuilder(); connectionBuilder.clientOptions(ClientOptions.copyOf(getOptions())); @@ -579,6 +558,10 @@ private ConnectionFuture> doConnect } StatefulRedisSentinelConnectionImpl connection = newStatefulRedisSentinelConnection(writer, codec, timeout); + ConnectionState connectionState = connection.getConnectionState(); + connectionBuilder.connectionInitializer(connectionState); + + initializeConnectionState(redisURI, connectionState); logger.debug("Connecting to Redis Sentinel, address: " + redisURI); @@ -589,13 +572,6 @@ private ConnectionFuture> doConnect channelType(connectionBuilder, redisURI); ConnectionFuture sync = initializeChannelAsync(connectionBuilder); - if (clientOptions.getProtocolVersion() == ProtocolVersion.RESP2 && LettuceStrings.isNotEmpty(clientName)) { - sync = sync.thenApply(channelHandler -> { - connection.setClientName(clientName); - return channelHandler; - }); - } - return sync.thenApply(ignore -> (StatefulRedisSentinelConnection) connection).whenComplete((ignore, e) -> { if (e != null) { @@ -770,6 +746,15 @@ private Mono lookupRedis(RedisURI sentinelUri) { }); } + private void initializeConnectionState(RedisURI redisURI, ConnectionState state) { + + state.setRequestedProtocolVersion(clientOptions.getProtocolVersion()); + state.setPingOnConnect(clientOptions.isPingBeforeActivateConnection()); + state.setClientName(redisURI.getClientName()); + state.setUsername(redisURI.getUsername()); + state.setPassword(redisURI.getPassword()); + } + private static ConnectionFuture transformAsyncConnectionException(ConnectionFuture future) { return future.thenCompose((v, e) -> { diff --git a/src/main/java/io/lettuce/core/RedisCommandBuilder.java b/src/main/java/io/lettuce/core/RedisCommandBuilder.java index d6a694c387..241755d381 100644 --- a/src/main/java/io/lettuce/core/RedisCommandBuilder.java +++ b/src/main/java/io/lettuce/core/RedisCommandBuilder.java @@ -26,7 +26,6 @@ import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.codec.RedisCodec; import io.lettuce.core.codec.StringCodec; -import io.lettuce.core.codec.StringCodec; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.output.*; import io.lettuce.core.protocol.*; @@ -853,7 +852,7 @@ Command hdel(K key, K... fields) { return createCommand(HDEL, new IntegerOutput<>(codec), args); } - Command> hello(int protocolVersion, byte[] user, byte password[], byte[] name) { + Command> hello(int protocolVersion, String user, char[] password, String name) { CommandArgs args = new CommandArgs<>(StringCodec.ASCII).add(protocolVersion); diff --git a/src/main/java/io/lettuce/core/SslConnectionBuilder.java b/src/main/java/io/lettuce/core/SslConnectionBuilder.java index 9e74e6ee4c..ba46439c46 100644 --- a/src/main/java/io/lettuce/core/SslConnectionBuilder.java +++ b/src/main/java/io/lettuce/core/SslConnectionBuilder.java @@ -15,36 +15,22 @@ */ package io.lettuce.core; -import static io.lettuce.core.ConnectionEventTrigger.local; -import static io.lettuce.core.ConnectionEventTrigger.remote; -import static io.lettuce.core.PlainChannelInitializer.sendHandshake; - import java.io.IOException; import java.security.GeneralSecurityException; -import java.time.Duration; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLParameters; -import io.lettuce.core.event.connection.ConnectedEvent; -import io.lettuce.core.event.connection.ConnectionActivatedEvent; -import io.lettuce.core.event.connection.DisconnectedEvent; import io.lettuce.core.internal.LettuceAssert; -import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.resource.ClientResources; import io.netty.channel.Channel; -import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; -import io.netty.handler.ssl.SslHandshakeCompletionEvent; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; /** @@ -75,35 +61,23 @@ protected List buildHandlers() { } @Override - public RedisChannelInitializer build() { - - return new SslChannelInitializer(getHandshakeCommandSupplier(), this::buildHandlers, redisURI, clientResources(), - getTimeout(), clientOptions().getSslOptions()); + public ChannelInitializer build() { + return new SslChannelInitializer(this::buildHandlers, redisURI, clientResources(), clientOptions().getSslOptions()); } - /** - * @author Mark Paluch - */ - static class SslChannelInitializer extends io.netty.channel.ChannelInitializer implements RedisChannelInitializer { + static class SslChannelInitializer extends io.netty.channel.ChannelInitializer { - private final Supplier> handshakeCommandSupplier; private final Supplier> handlers; private final RedisURI redisURI; private final ClientResources clientResources; - private final Duration timeout; private final SslOptions sslOptions; - private volatile CompletableFuture initializedFuture = new CompletableFuture<>(); - - public SslChannelInitializer(Supplier> handshakeCommandSupplier, - Supplier> handlers, RedisURI redisURI, ClientResources clientResources, Duration timeout, - SslOptions sslOptions) { + public SslChannelInitializer(Supplier> handlers, RedisURI redisURI, + ClientResources clientResources, SslOptions sslOptions) { - this.handshakeCommandSupplier = handshakeCommandSupplier; this.handlers = handlers; this.redisURI = redisURI; this.clientResources = clientResources; - this.timeout = timeout; this.sslOptions = sslOptions; } @@ -114,7 +88,7 @@ protected void initChannel(Channel channel) throws Exception { private void doInitialize(Channel channel) throws IOException, GeneralSecurityException { - SSLParameters sslParams = sslOptions.createSSLParameters(); + SSLParameters sslParams = sslOptions.createSSLParameters(); SslContextBuilder sslContextBuilder = sslOptions.createSslContextBuilder(); if (redisURI.isVerifyPeer()) { @@ -128,103 +102,14 @@ private void doInitialize(Channel channel) throws IOException, GeneralSecurityEx SSLEngine sslEngine = sslContext.newEngine(channel.alloc(), redisURI.getHost(), redisURI.getPort()); sslEngine.setSSLParameters(sslParams); - if (channel.pipeline().get("first") == null) { - channel.pipeline().addFirst("first", new ChannelDuplexHandler() { - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - clientResources.eventBus().publish(new ConnectedEvent(local(ctx), remote(ctx))); - super.channelActive(ctx); - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - clientResources.eventBus().publish(new DisconnectedEvent(local(ctx), remote(ctx))); - super.channelInactive(ctx); - } - }); - } - SslHandler sslHandler = new SslHandler(sslEngine, redisURI.isStartTls()); channel.pipeline().addLast(sslHandler); - if (channel.pipeline().get("channelActivator") == null) { - channel.pipeline().addLast("channelActivator", new RedisChannelInitializerImpl() { - - private AsyncCommand handshakeCommand; - - @Override - public CompletableFuture channelInitialized() { - return initializedFuture; - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - - if (!initializedFuture.isDone()) { - initializedFuture - .completeExceptionally(new RedisConnectionException("Connection closed prematurely")); - } - - initializedFuture = new CompletableFuture<>(); - handshakeCommand = null; - super.channelInactive(ctx); - } - - @Override - public void channelActive(ChannelHandlerContext ctx) throws Exception { - if (initializedFuture.isDone()) { - super.channelActive(ctx); - } - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof SslHandshakeCompletionEvent && !initializedFuture.isDone()) { - - SslHandshakeCompletionEvent event = (SslHandshakeCompletionEvent) evt; - if (event.isSuccess()) { - if (ConnectionBuilder.isHandshakeEnabled(handshakeCommandSupplier)) { - handshakeCommand = handshakeCommandSupplier.get(); - sendHandshake(handshakeCommand, initializedFuture, ctx, clientResources, timeout); - } else { - ctx.fireChannelActive(); - } - } else { - initializedFuture.completeExceptionally(event.cause()); - } - } - - if (evt instanceof ConnectionEvents.Activated) { - if (!initializedFuture.isDone()) { - initializedFuture.complete(true); - clientResources.eventBus().publish(new ConnectionActivatedEvent(local(ctx), remote(ctx))); - } - } - - super.userEventTriggered(ctx, evt); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (cause instanceof SSLHandshakeException || cause.getCause() instanceof SSLException) { - initializedFuture.completeExceptionally(cause); - } - super.exceptionCaught(ctx, cause); - } - }); - } - for (ChannelHandler handler : handlers.get()) { channel.pipeline().addLast(handler); } clientResources.nettyCustomizer().afterChannelInitialized(channel); } - - @Override - public CompletableFuture channelInitialized() { - return initializedFuture; - } } } diff --git a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java index d14ce456c7..7a588c4513 100644 --- a/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java +++ b/src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java @@ -50,12 +50,9 @@ public class StatefulRedisConnectionImpl extends RedisChannelHandler protected final RedisCommands sync; protected final RedisAsyncCommandsImpl async; protected final RedisReactiveCommandsImpl reactive; + private final ConnectionState state = new ConnectionState(); protected MultiOutput multi; - private char[] password; - private int db; - private boolean readOnly; - private String clientName; /** * Initialize a new connection. @@ -121,28 +118,6 @@ public boolean isMulti() { return multi != null; } - @Override - public void activated() { - - super.activated(); - // do not block in here, since the channel flow will be interrupted. - if (password != null) { - async.auth(password); - } - - if (db != 0) { - async.select(db); - } - - if (clientName != null) { - setClientName(clientName); - } - - if (readOnly) { - async.readOnly(); - } - } - @Override public RedisCommand dispatch(RedisCommand command) { @@ -185,12 +160,12 @@ protected RedisCommand preProcessCommand(RedisCommand comm char[] password = CommandArgsAccessor.getFirstCharArray(command.getArgs()); if (password != null) { - this.password = password; + state.setPassword(password); } else { String stringPassword = CommandArgsAccessor.getFirstString(command.getArgs()); if (stringPassword != null) { - this.password = stringPassword.toCharArray(); + state.setPassword(stringPassword.toCharArray()); } } } @@ -202,7 +177,7 @@ protected RedisCommand preProcessCommand(RedisCommand comm if ("OK".equals(status)) { Long db = CommandArgsAccessor.getFirstInteger(command.getArgs()); if (db != null) { - this.db = db.intValue(); + state.setDb(db.intValue()); } } }); @@ -211,7 +186,7 @@ protected RedisCommand preProcessCommand(RedisCommand comm if (local.getType().name().equals(READONLY.name())) { local = attachOnComplete(local, status -> { if ("OK".equals(status)) { - this.readOnly = true; + state.setReadOnly(true); } }); } @@ -219,7 +194,7 @@ protected RedisCommand preProcessCommand(RedisCommand comm if (local.getType().name().equals(READWRITE.name())) { local = attachOnComplete(local, status -> { if ("OK".equals(status)) { - this.readOnly = false; + state.setReadOnly(false); } }); } @@ -256,13 +231,22 @@ private RedisCommand attachOnComplete(RedisCommand command return command; } + /** + * @param clientName + * @deprecated since 6.0, use {@link RedisAsyncCommands#clientSetname(Object)}. + */ + @Deprecated public void setClientName(String clientName) { CommandArgs args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.SETNAME).addValue(clientName); - AsyncCommand async = new AsyncCommand<>(new Command<>(CommandType.CLIENT, new StatusOutput<>( - StringCodec.UTF8), args)); - this.clientName = clientName; + AsyncCommand async = new AsyncCommand<>( + new Command<>(CommandType.CLIENT, new StatusOutput<>(StringCodec.UTF8), args)); + state.setClientName(clientName); dispatch((RedisCommand) async); } + + public ConnectionState getConnectionState() { + return state; + } } diff --git a/src/main/java/io/lettuce/core/cluster/RedisState.java b/src/main/java/io/lettuce/core/cluster/CommandSet.java similarity index 94% rename from src/main/java/io/lettuce/core/cluster/RedisState.java rename to src/main/java/io/lettuce/core/cluster/CommandSet.java index 34ba42011f..21a4a4cee7 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisState.java +++ b/src/main/java/io/lettuce/core/cluster/CommandSet.java @@ -24,17 +24,17 @@ /** * Value object representing the current Redis state regarding its commands. *

- * {@link RedisState} caches command details and uses {@link CommandType}. + * {@link CommandSet} caches command details and uses {@link CommandType}. * * @author Mark Paluch * @since 4.4 */ -class RedisState { +class CommandSet { private final Map commands; private final EnumSet availableCommands = EnumSet.noneOf(CommandType.class); - public RedisState(Collection commands) { + public CommandSet(Collection commands) { Map map = new HashMap<>(); diff --git a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java index 3db79f3212..b1c74e398b 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterAsyncCommandsImpl.java @@ -649,8 +649,8 @@ private RedisClusterAsyncCommands findConnectionBySlot(int slot) { return null; } - private RedisState getRedisState() { - return ((StatefulRedisClusterConnectionImpl) super.getConnection()).getState(); + private CommandSet getRedisState() { + return ((StatefulRedisClusterConnectionImpl) super.getConnection()).getCommandSet(); } private boolean hasRedisState() { diff --git a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java index b117adc549..3097b05034 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisAdvancedClusterReactiveCommandsImpl.java @@ -573,8 +573,8 @@ private Mono> findConnectionBySlotReactive(in return Mono.error(new RedisException("No partition for slot " + slot)); } - private RedisState getRedisState() { - return ((StatefulRedisClusterConnectionImpl) super.getConnection()).getState(); + private CommandSet getRedisState() { + return ((StatefulRedisClusterConnectionImpl) super.getConnection()).getCommandSet(); } private boolean hasRedisState() { diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index 16db9b4ed3..332f9a567c 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -53,7 +53,6 @@ import io.lettuce.core.protocol.CommandExpiryWriter; import io.lettuce.core.protocol.CommandHandler; import io.lettuce.core.protocol.DefaultEndpoint; -import io.lettuce.core.protocol.ProtocolVersion; import io.lettuce.core.pubsub.PubSubCommandHandler; import io.lettuce.core.pubsub.PubSubEndpoint; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; @@ -498,7 +497,7 @@ ConnectionFuture> connectToNodeAsync(RedisC StatefulRedisConnectionImpl connection = new StatefulRedisConnectionImpl<>(writer, codec, timeout); - ConnectionFuture> connectionFuture = connectStatefulAsync(connection, codec, endpoint, + ConnectionFuture> connectionFuture = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier, () -> new CommandHandler(clientOptions, clientResources, endpoint)); return connectionFuture.whenComplete((conn, throwable) -> { @@ -539,7 +538,7 @@ ConnectionFuture> connectPubSubToNode StatefulRedisPubSubConnectionImpl connection = new StatefulRedisPubSubConnectionImpl<>(endpoint, writer, codec, timeout); - ConnectionFuture> connectionFuture = connectStatefulAsync(connection, codec, + ConnectionFuture> connectionFuture = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier, () -> new PubSubCommandHandler<>(clientOptions, clientResources, codec, endpoint)); return connectionFuture.whenComplete((conn, throwable) -> { @@ -593,30 +592,39 @@ private CompletableFuture> connectCl Supplier commandHandlerSupplier = () -> new CommandHandler(clientOptions, clientResources, endpoint); Mono> connectionMono = Mono - .defer(() -> connect(socketAddressSupplier, codec, endpoint, connection, commandHandlerSupplier)); + .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); for (int i = 1; i < getConnectionAttempts(); i++) { connectionMono = connectionMono - .onErrorResume(t -> connect(socketAddressSupplier, codec, endpoint, connection, commandHandlerSupplier)); + .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); } return connectionMono.flatMap(c -> c.reactive().command().collectList() // .map(CommandDetailParser::parse) // - .doOnNext(detail -> c.setState(new RedisState(detail))) + .doOnNext(detail -> c.setCommandSet(new CommandSet(detail))) // - .doOnError(e -> c.setState(new RedisState(Collections.emptyList()))).then(Mono.just(c)) + .doOnError(e -> c.setCommandSet(new CommandSet(Collections.emptyList()))).then(Mono.just(c)) .onErrorResume(RedisCommandExecutionException.class, e -> Mono.just(c))) .doOnNext( c -> connection.registerCloseables(closeableResources, clusterWriter, pooledClusterConnectionProvider)) .map(it -> (StatefulRedisClusterConnection) it).toFuture(); } - private Mono connect(Mono socketAddressSupplier, RedisCodec codec, - DefaultEndpoint endpoint, RedisChannelHandler connection, Supplier commandHandlerSupplier) { + private Mono connect(Mono socketAddressSupplier, DefaultEndpoint endpoint, + StatefulRedisClusterConnectionImpl connection, Supplier commandHandlerSupplier) { - ConnectionFuture future = connectStatefulAsync(connection, codec, endpoint, getFirstUri(), socketAddressSupplier, + ConnectionFuture future = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier, + commandHandlerSupplier); + + return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage())); + } + + private Mono connect(Mono socketAddressSupplier, DefaultEndpoint endpoint, + StatefulRedisConnectionImpl connection, Supplier commandHandlerSupplier) { + + ConnectionFuture future = connectStatefulAsync(connection, endpoint, getFirstUri(), socketAddressSupplier, commandHandlerSupplier); return Mono.fromCompletionStage(future).doOnError(t -> logger.warn(t.getMessage())); @@ -667,19 +675,19 @@ private CompletableFuture> con codec, endpoint); Mono> connectionMono = Mono - .defer(() -> connect(socketAddressSupplier, codec, endpoint, connection, commandHandlerSupplier)); + .defer(() -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); for (int i = 1; i < getConnectionAttempts(); i++) { connectionMono = connectionMono - .onErrorResume(t -> connect(socketAddressSupplier, codec, endpoint, connection, commandHandlerSupplier)); + .onErrorResume(t -> connect(socketAddressSupplier, endpoint, connection, commandHandlerSupplier)); } return connectionMono.flatMap(c -> c.reactive().command().collectList() // .map(CommandDetailParser::parse) // - .doOnNext(detail -> c.setState(new RedisState(detail))) - .doOnError(e -> c.setState(new RedisState(Collections.emptyList()))).then(Mono.just(c)) + .doOnNext(detail -> c.setCommandSet(new CommandSet(detail))) + .doOnError(e -> c.setCommandSet(new CommandSet(Collections.emptyList()))).then(Mono.just(c)) .onErrorResume(RedisCommandExecutionException.class, e -> Mono.just(c))) .doOnNext( c -> connection.registerCloseables(closeableResources, clusterWriter, pooledClusterConnectionProvider)) @@ -694,39 +702,37 @@ private int getConnectionAttempts() { * Initiates a channel connection considering {@link ClientOptions} initialization options, authentication and client name * options. */ - private , S> ConnectionFuture connectStatefulAsync(T connection, - RedisCodec codec, DefaultEndpoint endpoint, RedisURI connectionSettings, + private , S> ConnectionFuture connectStatefulAsync(T connection, + DefaultEndpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, Supplier commandHandlerSupplier) { - ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, endpoint, connectionSettings, + ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint, + connectionSettings, socketAddressSupplier, commandHandlerSupplier); ConnectionFuture> future = initializeChannelAsync(connectionBuilder); - ConnectionFuture sync = future; - if (clientOptions.getProtocolVersion() == ProtocolVersion.RESP2 - && LettuceStrings.isNotEmpty(connectionSettings.getClientName())) { - sync = sync.thenApply(channelHandler -> { + return future.thenApply(channelHandler -> (S) connection); + } - if (connection instanceof StatefulRedisClusterConnectionImpl) { - ((StatefulRedisClusterConnectionImpl) connection).setClientName(connectionSettings.getClientName()); - } + /** + * Initiates a channel connection considering {@link ClientOptions} initialization options, authentication and client name + * options. + */ + private , S> ConnectionFuture connectStatefulAsync(T connection, + DefaultEndpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, + Supplier commandHandlerSupplier) { - if (connection instanceof StatefulRedisConnectionImpl) { - ((StatefulRedisConnectionImpl) connection).setClientName(connectionSettings.getClientName()); - } - return channelHandler; - }); - } + ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint, + connectionSettings, socketAddressSupplier, commandHandlerSupplier); - return sync.thenApply(channelHandler -> (S) connection); - } + ConnectionFuture> future = initializeChannelAsync(connectionBuilder); - private boolean hasPassword(RedisURI connectionSettings) { - return connectionSettings.getPassword() != null && connectionSettings.getPassword().length != 0; + return future.thenApply(channelHandler -> (S) connection); } - private ConnectionBuilder createConnectionBuilder(RedisChannelHandler connection, DefaultEndpoint endpoint, + private ConnectionBuilder createConnectionBuilder(RedisChannelHandler connection, ConnectionState state, + DefaultEndpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, Supplier commandHandlerSupplier) { @@ -739,10 +745,18 @@ private ConnectionBuilder createConnectionBuilder(RedisChannelHandler c @Override public RedisFuture> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - if (getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO)) { + if (getStatefulConnection().getCommandSet().hasCommand(CommandType.GEORADIUS_RO)) { return super.georadius_ro(key, longitude, latitude, distance, unit); } @@ -75,7 +75,7 @@ public RedisFuture> georadius(K key, double longitude, double latitude, d public RedisFuture>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - if (getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO)) { + if (getStatefulConnection().getCommandSet().hasCommand(CommandType.GEORADIUS_RO)) { return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs); } @@ -85,7 +85,7 @@ public RedisFuture>> georadius(K key, double longitude, double @Override public RedisFuture> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - if (getStatefulConnection().getState().hasCommand(CommandType.GEORADIUSBYMEMBER_RO)) { + if (getStatefulConnection().getCommandSet().hasCommand(CommandType.GEORADIUSBYMEMBER_RO)) { return super.georadiusbymember_ro(key, member, distance, unit); } @@ -96,7 +96,7 @@ public RedisFuture> georadiusbymember(K key, V member, double distance, G public RedisFuture>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - if (getStatefulConnection().getState().hasCommand(CommandType.GEORADIUSBYMEMBER_RO)) { + if (getStatefulConnection().getCommandSet().hasCommand(CommandType.GEORADIUSBYMEMBER_RO)) { return super.georadiusbymember_ro(key, member, distance, unit, geoArgs); } diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubReactiveCommandsImpl.java b/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubReactiveCommandsImpl.java index 458327a5f7..bf7c445f39 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubReactiveCommandsImpl.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterPubSubReactiveCommandsImpl.java @@ -63,7 +63,7 @@ public RedisClusterPubSubReactiveCommandsImpl(StatefulRedisPubSubConnection georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) { - if (getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO)) { + if (getStatefulConnection().getCommandSet().hasCommand(CommandType.GEORADIUS_RO)) { return super.georadius_ro(key, longitude, latitude, distance, unit); } @@ -74,7 +74,7 @@ public Flux georadius(K key, double longitude, double latitude, double distan public Flux> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - if (getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO)) { + if (getStatefulConnection().getCommandSet().hasCommand(CommandType.GEORADIUS_RO)) { return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs); } @@ -84,7 +84,7 @@ public Flux> georadius(K key, double longitude, double latitude, do @Override public Flux georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) { - if (getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO)) { + if (getStatefulConnection().getCommandSet().hasCommand(CommandType.GEORADIUS_RO)) { return super.georadiusbymember_ro(key, member, distance, unit); } @@ -94,7 +94,7 @@ public Flux georadiusbymember(K key, V member, double distance, GeoArgs.Unit @Override public Flux> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) { - if (getStatefulConnection().getState().hasCommand(CommandType.GEORADIUS_RO)) { + if (getStatefulConnection().getCommandSet().hasCommand(CommandType.GEORADIUS_RO)) { return super.georadiusbymember_ro(key, member, distance, unit, geoArgs); } diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java index a11b8d59c5..9bd8d4a365 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterConnectionImpl.java @@ -40,10 +40,11 @@ import io.lettuce.core.cluster.models.partitions.Partitions; import io.lettuce.core.cluster.models.partitions.RedisClusterNode; import io.lettuce.core.codec.RedisCodec; -import io.lettuce.core.codec.StringCodec; import io.lettuce.core.internal.LettuceAssert; -import io.lettuce.core.output.StatusOutput; -import io.lettuce.core.protocol.*; +import io.lettuce.core.protocol.CommandArgsAccessor; +import io.lettuce.core.protocol.CompleteableCommand; +import io.lettuce.core.protocol.ConnectionWatchdog; +import io.lettuce.core.protocol.RedisCommand; /** * A thread-safe connection to a Redis Cluster. Multiple threads may share one {@link StatefulRedisClusterConnectionImpl} @@ -54,21 +55,18 @@ * @author Mark Paluch * @since 4.0 */ -public class StatefulRedisClusterConnectionImpl extends RedisChannelHandler implements - StatefulRedisClusterConnection { - - private Partitions partitions; - - private char[] password; - private boolean readOnly; - private String clientName; +public class StatefulRedisClusterConnectionImpl extends RedisChannelHandler + implements StatefulRedisClusterConnection { protected final RedisCodec codec; protected final RedisAdvancedClusterCommands sync; protected final RedisAdvancedClusterAsyncCommandsImpl async; protected final RedisAdvancedClusterReactiveCommandsImpl reactive; - private volatile RedisState state; + private final ConnectionState connectionState = new ConnectionState(); + + private Partitions partitions; + private volatile CommandSet commandSet; /** * Initialize a new connection. @@ -108,12 +106,12 @@ public RedisAdvancedClusterReactiveCommands reactive() { return reactive; } - RedisState getState() { - return state; + CommandSet getCommandSet() { + return commandSet; } - void setState(RedisState state) { - this.state = state; + void setCommandSet(CommandSet commandSet) { + this.commandSet = commandSet; } private RedisURI lookup(String nodeId) { @@ -135,8 +133,8 @@ public StatefulRedisConnection getConnection(String nodeId) { throw new RedisException("NodeId " + nodeId + " does not belong to the cluster"); } - return getClusterDistributionChannelWriter().getClusterConnectionProvider().getConnection( - ClusterConnectionProvider.Intent.WRITE, nodeId); + return getClusterDistributionChannelWriter().getClusterConnectionProvider() + .getConnection(ClusterConnectionProvider.Intent.WRITE, nodeId); } @Override @@ -157,8 +155,8 @@ public CompletableFuture> getConnectionAsync(Strin @Override public StatefulRedisConnection getConnection(String host, int port) { - return getClusterDistributionChannelWriter().getClusterConnectionProvider().getConnection( - ClusterConnectionProvider.Intent.WRITE, host, port); + return getClusterDistributionChannelWriter().getClusterConnectionProvider() + .getConnection(ClusterConnectionProvider.Intent.WRITE, host, port); } @Override @@ -174,34 +172,6 @@ ClusterDistributionChannelWriter getClusterDistributionChannelWriter() { return (ClusterDistributionChannelWriter) super.getChannelWriter(); } - @Override - public void activated() { - - super.activated(); - // do not block in here, since the channel flow will be interrupted. - if (password != null) { - async.auth(password); - } - - if (clientName != null) { - setClientName(clientName); - } - - if (readOnly) { - async.readOnly(); - } - } - - void setClientName(String clientName) { - - CommandArgs args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.SETNAME).addValue(clientName); - AsyncCommand async = new AsyncCommand<>(new Command<>(CommandType.CLIENT, new StatusOutput<>( - StringCodec.UTF8), args)); - this.clientName = clientName; - - dispatch((RedisCommand) async); - } - @Override public RedisCommand dispatch(RedisCommand command) { return super.dispatch(preProcessCommand(command)); @@ -228,12 +198,12 @@ private RedisCommand preProcessCommand(RedisCommand comman char[] password = CommandArgsAccessor.getFirstCharArray(command.getArgs()); if (password != null) { - this.password = password; + this.connectionState.setPassword(password); } else { String stringPassword = CommandArgsAccessor.getFirstString(command.getArgs()); if (stringPassword != null) { - this.password = stringPassword.toCharArray(); + this.connectionState.setPassword(stringPassword.toCharArray()); } } } @@ -243,7 +213,7 @@ private RedisCommand preProcessCommand(RedisCommand comman if (local.getType().name().equals(READONLY.name())) { local = attachOnComplete(local, status -> { if (status.equals("OK")) { - this.readOnly = true; + this.connectionState.setReadOnly(true); } }); } @@ -251,7 +221,7 @@ private RedisCommand preProcessCommand(RedisCommand comman if (local.getType().name().equals(READWRITE.name())) { local = attachOnComplete(local, status -> { if (status.equals("OK")) { - this.readOnly = false; + this.connectionState.setReadOnly(false); } }); } @@ -286,4 +256,8 @@ public void setReadFrom(ReadFrom readFrom) { public ReadFrom getReadFrom() { return getClusterDistributionChannelWriter().getReadFrom(); } + + ConnectionState getConnectionState() { + return connectionState; + } } diff --git a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java index 6d1e083df9..602b05cec6 100644 --- a/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java +++ b/src/main/java/io/lettuce/core/cluster/StatefulRedisClusterPubSubConnectionImpl.java @@ -49,7 +49,7 @@ class StatefulRedisClusterPubSubConnectionImpl extends StatefulRedisPubSub private final PubSubClusterEndpoint endpoint; private volatile Partitions partitions; - private volatile RedisState state; + private volatile CommandSet commandSet; /** * Initialize a new connection. @@ -104,12 +104,12 @@ protected RedisPubSubReactiveCommandsImpl newRedisReactiveCommandsImpl() { return new RedisClusterPubSubReactiveCommandsImpl(this, codec); } - RedisState getState() { - return state; + CommandSet getCommandSet() { + return commandSet; } - void setState(RedisState state) { - this.state = state; + void setCommandSet(CommandSet commandSet) { + this.commandSet = commandSet; } @Override diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 3859ae6698..a2379da932 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -15,8 +15,6 @@ */ package io.lettuce.core.protocol; -import static io.lettuce.core.ConnectionEvents.Activated; -import static io.lettuce.core.ConnectionEvents.HandshakeEvent; import static io.lettuce.core.ConnectionEvents.Reset; import java.io.IOException; @@ -33,6 +31,7 @@ import io.lettuce.core.output.CommandOutput; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.tracing.TraceContext; +import io.lettuce.core.tracing.TraceContextProvider; import io.lettuce.core.tracing.Tracer; import io.lettuce.core.tracing.Tracing; import io.netty.buffer.ByteBuf; @@ -162,6 +161,11 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { logger.debug("{} channelRegistered()", logPrefix()); } + tracedEndpoint = clientResources.tracing().createEndpoint(ctx.channel().remoteAddress()); + logPrefix = null; + pristine = true; + fallbackCommand = null; + setState(LifecycleState.REGISTERED); buffer = ctx.alloc().directBuffer(8192 * 8); @@ -207,13 +211,6 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc channel.config().setAutoRead(true); } else if (evt instanceof Reset) { reset(); - } else if (evt instanceof HandshakeEvent) { - - HandshakeEvent handshake = (HandshakeEvent) evt; - - stack.addFirst(handshake.getCommand()); - ctx.writeAndFlush(handshake.getCommand()); - return; } super.userEventTriggered(ctx, evt); @@ -265,11 +262,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - tracedEndpoint = clientResources.tracing().createEndpoint(ctx.channel().remoteAddress()); - logPrefix = null; - pristine = true; - fallbackCommand = null; - if (debugEnabled) { logger.debug("{} channelActive()", logPrefix()); } @@ -277,13 +269,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { setState(LifecycleState.CONNECTED); endpoint.notifyChannelActive(ctx.channel()); - super.channelActive(ctx); - if (channel != null) { - channel.eventLoop().submit((Runnable) () -> channel.pipeline().fireUserEventTriggered(new Activated())); - } - if (debugEnabled) { logger.debug("{} channelActive() done", logPrefix()); } @@ -385,10 +372,10 @@ private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand if (tracingEnabled && command instanceof CompleteableCommand) { - TracedCommand provider = CommandWrapper.unwrap(command, TracedCommand.class); + TracedCommand traced = CommandWrapper.unwrap(command, TracedCommand.class); + TraceContextProvider provider = (traced == null ? clientResources.tracing().initialTraceContextProvider() : traced); Tracer tracer = clientResources.tracing().getTracerProvider().getTracer(); - TraceContext context = (provider == null ? clientResources.tracing().initialTraceContextProvider() : provider) - .getTraceContext(); + TraceContext context = provider.getTraceContext(); Tracer.Span span = tracer.nextSpan(context); span.name(command.getType().name()); @@ -399,7 +386,10 @@ private void writeSingleCommand(ChannelHandlerContext ctx, RedisCommand span.remoteEndpoint(tracedEndpoint); span.start(); - provider.setSpan(span); + + if (traced != null) { + traced.setSpan(span); + } CompleteableCommand completeableCommand = (CompleteableCommand) command; completeableCommand.onComplete((o, throwable) -> { diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionInitializer.java b/src/main/java/io/lettuce/core/protocol/ConnectionInitializer.java new file mode 100644 index 0000000000..35dee02322 --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/ConnectionInitializer.java @@ -0,0 +1,38 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import java.util.concurrent.CompletionStage; + +import io.netty.channel.Channel; + +/** + * Initialize a connection to prepare it for usage. + * + * @author Mark Paluch + * @since 6.0 + */ +public interface ConnectionInitializer { + + /** + * Initialize the connection for usage. This method is invoked after establishing the transport connection and before the + * connection is used for user-space commands. + * + * @param channel the {@link Channel} to initialize. + * @return the {@link CompletionStage} that completes once the channel is fully initialized. + */ + CompletionStage initialize(Channel channel); +} diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java index 42502548c0..f45e15d7be 100644 --- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java @@ -128,19 +128,6 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo resetReconnectDelay(); } - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - - logger.debug("{} userEventTriggered(ctx, {})", logPrefix(), evt); - - if (evt instanceof ConnectionEvents.Activated) { - attempts = 0; - resetReconnectDelay(); - } - - super.userEventTriggered(ctx, evt); - } - void prepareClose() { setListenOnChannelInactive(false); @@ -162,6 +149,8 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { reconnectScheduleTimeout = null; logPrefix = null; remoteAddress = channel.remoteAddress(); + attempts = 0; + resetReconnectDelay(); logPrefix = null; logger.debug("{} channelActive()", logPrefix()); diff --git a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java index 4511e6cb79..297526d9f2 100644 --- a/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java +++ b/src/main/java/io/lettuce/core/protocol/ReconnectionHandler.java @@ -18,22 +18,21 @@ import java.net.ConnectException; import java.net.SocketAddress; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; import io.lettuce.core.ClientOptions; -import io.lettuce.core.RedisChannelInitializer; import io.lettuce.core.RedisCommandTimeoutException; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceSets; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; -import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -51,13 +50,8 @@ class ReconnectionHandler { private final ClientOptions clientOptions; private final Bootstrap bootstrap; private final Mono socketAddressSupplier; - private final Timer timer; - private final ExecutorService reconnectWorkers; private final ConnectionFacade connectionFacade; - private TimeUnit timeoutUnit = TimeUnit.SECONDS; - private long timeout = 60; - private volatile CompletableFuture currentFuture; private volatile boolean reconnectSuspended; @@ -73,8 +67,6 @@ class ReconnectionHandler { this.socketAddressSupplier = socketAddressSupplier; this.bootstrap = bootstrap; this.clientOptions = clientOptions; - this.timer = timer; - this.reconnectWorkers = reconnectWorkers; this.connectionFacade = connectionFacade; } @@ -114,7 +106,6 @@ protected Tuple2, CompletableFuture> r private void reconnect0(CompletableFuture result, SocketAddress remoteAddress) { ChannelFuture connectFuture = bootstrap.connect(remoteAddress); - ChannelPromise initFuture = connectFuture.channel().newPromise(); logger.debug("Reconnecting to Redis at {}", remoteAddress); @@ -122,103 +113,57 @@ private void reconnect0(CompletableFuture result, SocketAddress remoteA if (t instanceof CancellationException) { connectFuture.cancel(true); - initFuture.cancel(true); - } - }); - - initFuture.addListener((ChannelFuture it) -> { - - if (it.cause() != null) { - - connectFuture.cancel(true); - close(it.channel()); - result.completeExceptionally(it.cause()); - } else { - result.complete(connectFuture.channel()); } }); - connectFuture.addListener((ChannelFuture it) -> { + connectFuture.addListener(future -> { - if (it.cause() != null) { - - initFuture.tryFailure(it.cause()); + if (!future.isSuccess()) { + result.completeExceptionally(future.cause()); return; } - ChannelPipeline pipeline = it.channel().pipeline(); - RedisChannelInitializer channelInitializer = pipeline.get(RedisChannelInitializer.class); - - if (channelInitializer == null) { + RedisHandshakeHandler handshakeHandler = connectFuture.channel().pipeline().get(RedisHandshakeHandler.class); - initFuture.tryFailure(new IllegalStateException( - "Reconnection attempt without a RedisChannelInitializer in the channel pipeline")); + if (handshakeHandler == null) { + result.completeExceptionally(new IllegalStateException("RedisHandshakeHandler not registered")); return; } - channelInitializer.channelInitialized().whenComplete( - (state, throwable) -> { + handshakeHandler.channelInitialized().whenComplete((success, throwable) -> { - if (throwable != null) { + if (throwable != null) { - if (isExecutionException(throwable)) { - initFuture.tryFailure(throwable); - return; - } + if (isExecutionException(throwable)) { + result.completeExceptionally(throwable); + return; + } - if (clientOptions.isCancelCommandsOnReconnectFailure()) { - connectionFacade.reset(); - } + if (clientOptions.isCancelCommandsOnReconnectFailure()) { + connectionFacade.reset(); + } - if (clientOptions.isSuspendReconnectOnProtocolFailure()) { + if (clientOptions.isSuspendReconnectOnProtocolFailure()) { - logger.error("Disabling autoReconnect due to initialization failure", throwable); - setReconnectSuspended(true); - } + logger.error("Disabling autoReconnect due to initialization failure", throwable); + setReconnectSuspended(true); + } - initFuture.tryFailure(throwable); + result.completeExceptionally(throwable); + return; + } - return; - } + if (logger.isDebugEnabled()) { + logger.info("Reconnected to {}, Channel {}", remoteAddress, + ChannelLogDescriptor.logDescriptor(connectFuture.channel())); + } else { + logger.info("Reconnected to {}", remoteAddress); + } - if (logger.isDebugEnabled()) { - logger.info("Reconnected to {}, Channel {}", remoteAddress, - ChannelLogDescriptor.logDescriptor(it.channel())); - } else { - logger.info("Reconnected to {}", remoteAddress); - } + result.complete(connectFuture.channel()); + }); - initFuture.trySuccess(); - }); }); - - Runnable timeoutAction = () -> { - initFuture.tryFailure(new TimeoutException(String.format("Reconnection attempt exceeded timeout of %d %s ", - timeout, timeoutUnit))); - }; - - Timeout timeoutHandle = timer.newTimeout(it -> { - - if (connectFuture.isDone() && initFuture.isDone()) { - return; - } - - if (reconnectWorkers.isShutdown()) { - timeoutAction.run(); - return; - } - - reconnectWorkers.submit(timeoutAction); - - }, this.timeout, timeoutUnit); - - initFuture.addListener(it -> timeoutHandle.cancel()); - } - - private void close(Channel channel) { - if (channel != null) { - channel.close(); - } } boolean isReconnectSuspended() { @@ -229,14 +174,6 @@ void setReconnectSuspended(boolean reconnectSuspended) { this.reconnectSuspended = reconnectSuspended; } - long getTimeout() { - return timeout; - } - - void setTimeout(long timeout) { - this.timeout = timeout; - } - void prepareClose() { CompletableFuture currentFuture = this.currentFuture; diff --git a/src/main/java/io/lettuce/core/protocol/RedisHandshakeHandler.java b/src/main/java/io/lettuce/core/protocol/RedisHandshakeHandler.java new file mode 100644 index 0000000000..773d55bf6e --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/RedisHandshakeHandler.java @@ -0,0 +1,157 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import java.net.SocketAddress; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import io.lettuce.core.ExceptionFactory; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.resource.ClientResources; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.local.LocalAddress; +import io.netty.util.Timeout; + +/** + * Handler to initialize a Redis Connection using a {@link ConnectionInitializer}. + * + * @author Mark Paluch + * @since 6.0 + */ +public class RedisHandshakeHandler extends ChannelInboundHandlerAdapter { + + private final ConnectionInitializer connectionInitializer; + private final ClientResources clientResources; + private final Duration initializeTimeout; + + private final CompletableFuture handshakeFuture = new CompletableFuture<>(); + + public RedisHandshakeHandler(ConnectionInitializer connectionInitializer, ClientResources clientResources, + Duration initializeTimeout) { + this.connectionInitializer = connectionInitializer; + this.clientResources = clientResources; + this.initializeTimeout = initializeTimeout; + } + + @Override + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + + Runnable timeoutGuard = () -> { + + if (handshakeFuture.isDone()) { + return; + } + + fail(ctx, ExceptionFactory.createTimeoutException("Connection initialization timed out", initializeTimeout)); + }; + + Timeout timeoutHandle = clientResources.timer().newTimeout(t -> { + + if (clientResources.eventExecutorGroup().isShuttingDown()) { + timeoutGuard.run(); + return; + } + + clientResources.eventExecutorGroup().submit(timeoutGuard); + }, initializeTimeout.toNanos(), TimeUnit.NANOSECONDS); + + handshakeFuture.thenAccept(ignore -> { + timeoutHandle.cancel(); + }); + + super.channelRegistered(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + + if (!handshakeFuture.isDone()) { + fail(ctx, new RedisConnectionException("Connection closed prematurely")); + } + + super.channelInactive(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + + CompletionStage future = connectionInitializer.initialize(ctx.channel()); + + future.whenComplete((ignore, throwable) -> { + + if (throwable != null) { + fail(ctx, throwable); + } else { + ctx.fireChannelActive(); + succeed(); + } + }); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + + if (!handshakeFuture.isDone()) { + fail(ctx, cause); + } + + super.exceptionCaught(ctx, cause); + } + + /** + * Complete the handshake future successfully. + */ + protected void succeed() { + handshakeFuture.complete(null); + } + + /** + * Complete the handshake future with an error and close the channel.. + */ + protected void fail(ChannelHandlerContext ctx, Throwable cause) { + + ctx.close().addListener(closeFuture -> { + handshakeFuture.completeExceptionally(cause); + }); + } + + /** + * @return future to synchronize channel initialization. Returns a new future for every reconnect. + */ + public CompletionStage channelInitialized() { + return handshakeFuture; + } + + static SocketAddress remote(ChannelHandlerContext ctx) { + if (ctx.channel() != null && ctx.channel().remoteAddress() != null) { + return ctx.channel().remoteAddress(); + } + return new LocalAddress("unknown"); + } + + static SocketAddress local(ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + if (channel != null && channel.localAddress() != null) { + return channel.localAddress(); + } + return LocalAddress.ANY; + } +} diff --git a/src/main/java/io/lettuce/core/protocol/SslRedisHandshakeHandler.java b/src/main/java/io/lettuce/core/protocol/SslRedisHandshakeHandler.java new file mode 100644 index 0000000000..2c2aeb7d5b --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/SslRedisHandshakeHandler.java @@ -0,0 +1,58 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import java.time.Duration; + +import io.lettuce.core.resource.ClientResources; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.ssl.SslHandshakeCompletionEvent; + +/** + * Handler to initialize a secure Redis Connection using a {@link ConnectionInitializer}. Delays channel activation to after the + * SSL handshake. + * + * @author Mark Paluch + * @since 6.0 + */ +public class SslRedisHandshakeHandler extends RedisHandshakeHandler { + + public SslRedisHandshakeHandler(ConnectionInitializer connectionInitializer, ClientResources clientResources, + Duration initializeTimeout) { + super(connectionInitializer, clientResources, initializeTimeout); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + + if (evt instanceof SslHandshakeCompletionEvent) { + + SslHandshakeCompletionEvent event = (SslHandshakeCompletionEvent) evt; + if (event.isSuccess()) { + super.channelActive(ctx); + } else { + fail(ctx, event.cause()); + } + } + + super.userEventTriggered(ctx, evt); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + // do not propagate channel active when using SSL. + } +} diff --git a/src/main/java/io/lettuce/core/sentinel/StatefulRedisSentinelConnectionImpl.java b/src/main/java/io/lettuce/core/sentinel/StatefulRedisSentinelConnectionImpl.java index d7ae64d367..5f882f2b4b 100644 --- a/src/main/java/io/lettuce/core/sentinel/StatefulRedisSentinelConnectionImpl.java +++ b/src/main/java/io/lettuce/core/sentinel/StatefulRedisSentinelConnectionImpl.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Collection; +import io.lettuce.core.ConnectionState; import io.lettuce.core.RedisChannelHandler; import io.lettuce.core.RedisChannelWriter; import io.lettuce.core.codec.RedisCodec; @@ -32,15 +33,15 @@ /** * @author Mark Paluch */ -public class StatefulRedisSentinelConnectionImpl extends RedisChannelHandler implements - StatefulRedisSentinelConnection { +public class StatefulRedisSentinelConnectionImpl extends RedisChannelHandler + implements StatefulRedisSentinelConnection { protected final RedisCodec codec; protected final RedisSentinelCommands sync; protected final RedisSentinelAsyncCommands async; protected final RedisSentinelReactiveCommands reactive; - private String clientName; + private final ConnectionState connectionState = new ConnectionState(); public StatefulRedisSentinelConnectionImpl(RedisChannelWriter writer, RedisCodec codec, Duration timeout) { @@ -77,23 +78,21 @@ public RedisSentinelReactiveCommands reactive() { return reactive; } - @Override - public void activated() { - - super.activated(); - - if (clientName != null) { - setClientName(clientName); - } - } - + /** + * @param clientName + * @deprecated since 6.0, use {@link RedisSentinelAsyncCommands#clientSetname(Object)}. + */ public void setClientName(String clientName) { CommandArgs args = new CommandArgs<>(StringCodec.UTF8).add(CommandKeyword.SETNAME).addValue(clientName); AsyncCommand async = new AsyncCommand<>( new Command<>(CommandType.CLIENT, new StatusOutput<>(StringCodec.UTF8), args)); - this.clientName = clientName; + connectionState.setClientName(clientName); dispatch((RedisCommand) async); } + + public ConnectionState getConnectionState() { + return connectionState; + } } diff --git a/src/test/java/io/lettuce/core/ConnectionCommandIntegrationTests.java b/src/test/java/io/lettuce/core/ConnectionCommandIntegrationTests.java index 0b8f9dd019..827d020b97 100644 --- a/src/test/java/io/lettuce/core/ConnectionCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/ConnectionCommandIntegrationTests.java @@ -23,9 +23,9 @@ import javax.inject.Inject; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.springframework.test.util.ReflectionTestUtils; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; @@ -49,6 +49,11 @@ class ConnectionCommandIntegrationTests extends TestSupport { this.redis = connection.sync(); } + @BeforeEach + void setUp() { + redis.flushall(); + } + @Test void auth() { @@ -70,7 +75,6 @@ void auth() { authConnection.ping(); authConnection.getStatefulConnection().close(); }); - } @Test @@ -144,8 +148,9 @@ void authInvalidPassword() { fail("Authenticated with invalid password"); } catch (RedisException e) { assertThat(e.getMessage()).startsWith("ERR").contains("AUTH"); - StatefulRedisConnection statefulRedisCommands = async.getStatefulConnection(); - assertThat(ReflectionTestUtils.getField(statefulRedisCommands, "password")).isNull(); + StatefulRedisConnectionImpl statefulRedisCommands = (StatefulRedisConnectionImpl) async + .getStatefulConnection(); + assertThat(statefulRedisCommands.getConnectionState()).extracting("password").isNull(); } finally { async.getStatefulConnection().close(); } @@ -159,8 +164,9 @@ void selectInvalid() { fail("Selected invalid db index"); } catch (RedisException e) { assertThat(e.getMessage()).startsWith("ERR"); - StatefulRedisConnection statefulRedisCommands = async.getStatefulConnection(); - assertThat(ReflectionTestUtils.getField(statefulRedisCommands, "db")).isEqualTo(0); + StatefulRedisConnectionImpl statefulRedisCommands = (StatefulRedisConnectionImpl) async + .getStatefulConnection(); + assertThat(statefulRedisCommands.getConnectionState()).extracting("db").isEqualTo(0); } finally { async.getStatefulConnection().close(); } diff --git a/src/test/java/io/lettuce/core/cluster/RedisStateIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/CommandSetIntegrationTests.java similarity index 91% rename from src/test/java/io/lettuce/core/cluster/RedisStateIntegrationTests.java rename to src/test/java/io/lettuce/core/cluster/CommandSetIntegrationTests.java index b9b90ec630..96186ad4ac 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisStateIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/CommandSetIntegrationTests.java @@ -36,21 +36,20 @@ * @author Mark Paluch */ @ExtendWith(LettuceExtension.class) -public class RedisStateIntegrationTests { +public class CommandSetIntegrationTests { private final RedisCommands redis; @Inject - RedisStateIntegrationTests(StatefulRedisConnection connection) { + CommandSetIntegrationTests(StatefulRedisConnection connection) { this.redis = connection.sync(); } @Test void shouldDiscoverCommands() { - List commandDetails = CommandDetailParser.parse(redis.command()); - RedisState state = new RedisState(commandDetails); + CommandSet state = new CommandSet(commandDetails); assertThat(state.hasCommand(CommandType.GEOADD)).isTrue(); assertThat(state.hasCommand(UnknownCommand.FOO)).isFalse(); diff --git a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java index c4205b9418..84706cf8d3 100644 --- a/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/RedisClusterClientIntegrationTests.java @@ -35,7 +35,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.springframework.test.util.ReflectionTestUtils; import io.lettuce.core.*; import io.lettuce.core.api.StatefulRedisConnection; @@ -517,11 +516,11 @@ void readOnlyOnCluster() { TestFutures.awaitOrTimeout(connection.async().quit()); - assertThat(ReflectionTestUtils.getField(connection, "readOnly")).isEqualTo(Boolean.TRUE); + assertThat(connection).extracting("connectionState").extracting("readOnly").isEqualTo(Boolean.TRUE); sync.readWrite(); - assertThat(ReflectionTestUtils.getField(connection, "readOnly")).isEqualTo(Boolean.FALSE); + assertThat(connection).extracting("connectionState").extracting("readOnly").isEqualTo(Boolean.FALSE); RedisClusterClient clusterClient = RedisClusterClient.create(TestClientResources.get(), RedisURI.Builder.redis(host, 40400).build()); try { diff --git a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java index a3c2eae202..24c82e0f67 100644 --- a/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java +++ b/src/test/java/io/lettuce/core/protocol/CommandHandlerUnitTests.java @@ -45,7 +45,6 @@ import org.springframework.test.util.ReflectionTestUtils; import io.lettuce.core.ClientOptions; -import io.lettuce.core.ConnectionEvents; import io.lettuce.core.RedisException; import io.lettuce.core.codec.StringCodec; import io.lettuce.core.metrics.CommandLatencyCollector; @@ -142,15 +141,6 @@ void before() throws Exception { stack = (Queue) ReflectionTestUtils.getField(sut, "stack"); } - @Test - void testChannelActive() throws Exception { - sut.channelRegistered(context); - - sut.channelActive(context); - - verify(pipeline).fireUserEventTriggered(any(ConnectionEvents.Activated.class)); - } - @Test void testExceptionChannelActive() throws Exception { sut.setState(CommandHandler.LifecycleState.ACTIVE); diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandHandlerUnitTests.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandHandlerUnitTests.java index de97352ca9..288bd3a9ee 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandHandlerUnitTests.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandHandlerUnitTests.java @@ -278,7 +278,7 @@ void shouldCompleteUnsubscribe() throws Exception { doAnswer((Answer>) inv -> { PubSubOutput out = inv.getArgument(0); if (out.type() == PubSubOutput.Type.message) { - throw new NullPointerException(); + throw new NullPointerException("Expected exception"); } return endpoint; }).when(endpoint).notifyMessage(any()); diff --git a/src/test/java/io/lettuce/core/tracing/BraveTracingIntegrationTests.java b/src/test/java/io/lettuce/core/tracing/BraveTracingIntegrationTests.java index 4ddaea0fcd..6f6977a030 100644 --- a/src/test/java/io/lettuce/core/tracing/BraveTracingIntegrationTests.java +++ b/src/test/java/io/lettuce/core/tracing/BraveTracingIntegrationTests.java @@ -98,8 +98,8 @@ void pingWithTrace() { List spans = new ArrayList<>(BraveTracingIntegrationTests.spans); - assertThat(spans.get(0).name()).isEqualTo("ping"); - assertThat(spans.get(1).name()).isEqualTo("foo"); + assertThat(spans.get(0).name()).isEqualTo("hello"); + assertThat(spans.get(1).name()).isEqualTo("ping"); } @Test @@ -114,17 +114,17 @@ void pingWithTraceShouldCatchErrors() { } catch (Exception e) { } - Wait.untilEquals(2, spans::size).waitOrTimeout(); + Wait.untilTrue(() -> spans.size() > 2).waitOrTimeout(); foo.finish(); List spans = new ArrayList<>(BraveTracingIntegrationTests.spans); - assertThat(spans.get(0).name()).isEqualTo("set"); - assertThat(spans.get(1).name()).isEqualTo("hgetall"); - assertThat(spans.get(1).tags()).containsEntry("error", + assertThat(spans.get(1).name()).isEqualTo("set"); + assertThat(spans.get(2).name()).isEqualTo("hgetall"); + assertThat(spans.get(2).tags()).containsEntry("error", "WRONGTYPE Operation against a key holding the wrong kind of value"); - assertThat(spans.get(2).name()).isEqualTo("foo"); + assertThat(spans.get(3).name()).isEqualTo("foo"); } @Test @@ -140,17 +140,17 @@ void getAndSetWithTraceWithCommandArgsExcludedFromTags() { connect.sync().set("foo", "bar"); connect.sync().get("foo"); - Wait.untilEquals(2, spans::size).waitOrTimeout(); + Wait.untilTrue(() -> spans.size() > 2).waitOrTimeout(); trace.finish(); List spans = new ArrayList<>(BraveTracingIntegrationTests.spans); - assertThat(spans.get(0).name()).isEqualTo("set"); - assertThat(spans.get(0).tags()).doesNotContainKey("redis.args"); - assertThat(spans.get(1).name()).isEqualTo("get"); + assertThat(spans.get(1).name()).isEqualTo("set"); assertThat(spans.get(1).tags()).doesNotContainKey("redis.args"); - assertThat(spans.get(2).name()).isEqualTo("foo"); + assertThat(spans.get(2).name()).isEqualTo("get"); + assertThat(spans.get(2).tags()).doesNotContainKey("redis.args"); + assertThat(spans.get(3).name()).isEqualTo("foo"); FastShutdown.shutdown(client); FastShutdown.shutdown(clientResources); @@ -183,8 +183,8 @@ void reactivePingWithTrace() { List spans = new ArrayList<>(BraveTracingIntegrationTests.spans); - assertThat(spans.get(0).name()).isEqualTo("ping"); - assertThat(spans.get(1).name()).isEqualTo("foo"); + assertThat(spans.get(1).name()).isEqualTo("ping"); + assertThat(spans.get(2).name()).isEqualTo("foo"); } @Test @@ -199,17 +199,17 @@ void reactiveGetAndSetWithTrace() { .as(StepVerifier::create) // .expectNext("bar").verifyComplete(); - Wait.untilEquals(2, spans::size).waitOrTimeout(); + Wait.untilTrue(() -> spans.size() > 2).waitOrTimeout(); trace.finish(); List spans = new ArrayList<>(BraveTracingIntegrationTests.spans); - assertThat(spans.get(0).name()).isEqualTo("set"); - assertThat(spans.get(0).tags()).containsEntry("redis.args", "key value"); - assertThat(spans.get(1).name()).isEqualTo("get"); - assertThat(spans.get(1).tags()).containsEntry("redis.args", "key"); - assertThat(spans.get(2).name()).isEqualTo("foo"); + assertThat(spans.get(1).name()).isEqualTo("set"); + assertThat(spans.get(1).tags()).containsEntry("redis.args", "key value"); + assertThat(spans.get(2).name()).isEqualTo("get"); + assertThat(spans.get(2).tags()).containsEntry("redis.args", "key"); + assertThat(spans.get(3).name()).isEqualTo("foo"); } @Test @@ -227,13 +227,13 @@ void reactiveGetAndSetWithTraceProvider() { .as(StepVerifier::create) // .expectNext("bar").verifyComplete(); - Wait.untilEquals(2, spans::size).waitOrTimeout(); + Wait.untilTrue(() -> spans.size() > 2).waitOrTimeout(); trace.finish(); List spans = new ArrayList<>(BraveTracingIntegrationTests.spans); - assertThat(spans.get(0).name()).isEqualTo("set"); - assertThat(spans.get(1).name()).isEqualTo("get"); + assertThat(spans.get(1).name()).isEqualTo("set"); + assertThat(spans.get(2).name()).isEqualTo("get"); } }