diff --git a/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java b/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java index 52a61a47ea..029c6479ea 100644 --- a/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java +++ b/src/main/java/io/lettuce/core/cluster/topology/DefaultClusterTopologyRefresh.java @@ -105,7 +105,8 @@ public CompletionStage> loadViews(Iterable s Requests requestedTopology = connections.requestTopology(commandTimeoutNs, TimeUnit.NANOSECONDS); Requests requestedInfo = connections.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS); return CompletableFuture.allOf(requestedTopology.allCompleted(), requestedInfo.allCompleted()) - .thenCompose(ignore -> getNodeSpecificViewsAsync(requestedTopology, requestedInfo)) + .thenApplyAsync(ignore -> getNodeSpecificViews(requestedTopology, requestedInfo), + clientResources.eventExecutorGroup()) .thenCompose(views -> { if (discovery && isEventLoopActive()) { @@ -124,11 +125,12 @@ public CompletionStage> loadViews(Iterable s Requests additionalTopology = newConnections .requestTopology(commandTimeoutNs, TimeUnit.NANOSECONDS).mergeWith(requestedTopology); - Requests additionalClients = newConnections - .requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS).mergeWith(requestedInfo); + Requests additionalInfo = newConnections.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS) + .mergeWith(requestedInfo); return CompletableFuture - .allOf(additionalTopology.allCompleted(), additionalClients.allCompleted()) - .thenCompose(ignore2 -> getNodeSpecificViewsAsync(additionalTopology, additionalClients)); + .allOf(additionalTopology.allCompleted(), additionalInfo.allCompleted()) + .thenApplyAsync(ignore2 -> getNodeSpecificViews(additionalTopology, additionalInfo), + clientResources.eventExecutorGroup()); }); } @@ -281,13 +283,6 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ return new NodeTopologyViews(views); } - private CompletableFuture getNodeSpecificViewsAsync(Requests requestedTopology, Requests requestedInfo) { - // use computation thread pool - // ref: https://github.com/lettuce-io/lettuce-core/issues/2045 - return CompletableFuture.supplyAsync(() -> getNodeSpecificViews(requestedTopology, requestedInfo), - clientResources.eventExecutorGroup()); - } - private static boolean validNode(RedisClusterNode redisClusterNode) { if (redisClusterNode.is(RedisClusterNode.NodeFlag.NOADDR)) { @@ -376,7 +371,7 @@ private boolean isEventLoopActive() { private static Set difference(Set allKnown, Set seed) { - Set result = new TreeSet<>(TopologyComparators.RedisURIComparator.INSTANCE); + Set result = new TreeSet<>(TopologyComparators.RedisURIComparator.INSTANCE); for (RedisURI e : allKnown) { if (!seed.contains(e)) { diff --git a/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java b/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java index 19580e6313..834ad933e2 100644 --- a/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java +++ b/src/test/java/io/lettuce/core/cluster/topology/ClusterTopologyRefreshUnitTests.java @@ -74,6 +74,7 @@ class ClusterTopologyRefreshUnitTests { private static final String NODE_1_VIEW = "1 127.0.0.1:7380 master,myself - 0 1401258245007 2 disconnected 8000-11999\n" + "2 127.0.0.1:7381 master - 111 1401258245007 222 connected 7000 12000 12002-16383\n"; + private static final String NODE_2_VIEW = "1 127.0.0.1:7380 master - 0 1401258245007 2 disconnected 8000-11999\n" + "2 127.0.0.1:7381 master,myself - 111 1401258245007 222 connected 7000 12000 12002-16383\n"; @@ -114,6 +115,11 @@ void before() { when(clientResources.timer()).thenReturn(timer); when(clientResources.socketAddressResolver()).thenReturn(SocketAddressResolver.create(DnsResolver.unresolved())); when(clientResources.eventExecutorGroup()).thenReturn(eventExecutors); + doAnswer(invocation -> { + ((Runnable) invocation.getArgument(0)).run(); + return null; + }).when(eventExecutors).execute(any(Runnable.class)); + when(connection1.async()).thenReturn(asyncCommands1); when(connection2.async()).thenReturn(asyncCommands2); when(connection1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); @@ -299,10 +305,10 @@ void shouldAttemptToConnectOnlyOnce() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381)))) - .thenReturn(completedWithException(new RedisException("connection failed"))); + .thenReturn(completedWithException(new RedisException("connection failed"))); sut.loadViews(seed, Duration.ofSeconds(1), true); @@ -346,10 +352,10 @@ void shouldFailIfNoNodeConnects() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedWithException(new RedisException("connection failed"))); + .thenReturn(completedWithException(new RedisException("connection failed"))); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381)))) - .thenReturn(completedWithException(new RedisException("connection failed"))); + .thenReturn(completedWithException(new RedisException("connection failed"))); try { sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join(); @@ -373,10 +379,10 @@ void shouldShouldDiscoverNodes() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection2)); + .thenReturn(completedFuture((StatefulRedisConnection) connection2)); sut.loadViews(seed, Duration.ofSeconds(1), true); @@ -393,7 +399,7 @@ void shouldShouldNotDiscoverNodes() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); sut.loadViews(seed, Duration.ofSeconds(1), false); @@ -410,11 +416,11 @@ void shouldNotFailOnDuplicateSeedNodes() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection2)); + .thenReturn(completedFuture((StatefulRedisConnection) connection2)); sut.loadViews(seed, Duration.ofSeconds(1), true); @@ -431,10 +437,10 @@ void shouldCloseConnections() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection2)); + .thenReturn(completedFuture((StatefulRedisConnection) connection2)); sut.loadViews(seed, Duration.ofSeconds(1), true); @@ -449,7 +455,7 @@ void undiscoveredAdditionalNodesShouldBeLastUsingClientCount() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); Map partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), false).toCompletableFuture() .join(); @@ -469,10 +475,10 @@ void discoveredAdditionalNodesShouldBeOrderedUsingClientCount() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection2)); + .thenReturn(completedFuture((StatefulRedisConnection) connection2)); Map partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join(); @@ -480,7 +486,7 @@ void discoveredAdditionalNodesShouldBeOrderedUsingClientCount() { List nodes = TopologyComparators.sortByClientCount(partitions); - assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(RedisURI.create("127.0.0.1", 7381), + assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).contains(RedisURI.create("127.0.0.1", 7381), seed.get(0)); } @@ -491,7 +497,7 @@ void undiscoveredAdditionalNodesShouldBeLastUsingLatency() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); Map partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), false).toCompletableFuture() .join(); @@ -511,10 +517,10 @@ void discoveredAdditionalNodesShouldBeOrderedUsingLatency() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection2)); + .thenReturn(completedFuture((StatefulRedisConnection) connection2)); Map partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join(); @@ -522,7 +528,7 @@ void discoveredAdditionalNodesShouldBeOrderedUsingLatency() { List nodes = TopologyComparators.sortByLatency(partitions); - assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(RedisURI.create("127.0.0.1", 7381), + assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).contains(RedisURI.create("127.0.0.1", 7381), seed.get(0)); } @@ -533,10 +539,10 @@ void shouldPropagateCommandFailures() { when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection1)); + .thenReturn(completedFuture((StatefulRedisConnection) connection1)); when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class), eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381)))) - .thenReturn(completedFuture((StatefulRedisConnection) connection2)); + .thenReturn(completedFuture((StatefulRedisConnection) connection2)); reset(connection1, connection2); @@ -618,4 +624,5 @@ private static ConnectionFuture completedWithException(Exception e) { return ConnectionFuture.from(InetSocketAddress.createUnresolved(TestSettings.host(), TestSettings.port()), future); } + } diff --git a/src/test/java/io/lettuce/core/cluster/topology/TopologyRefreshIntegrationTests.java b/src/test/java/io/lettuce/core/cluster/topology/TopologyRefreshIntegrationTests.java index 4059de6ad6..b8b3e2b7cb 100644 --- a/src/test/java/io/lettuce/core/cluster/topology/TopologyRefreshIntegrationTests.java +++ b/src/test/java/io/lettuce/core/cluster/topology/TopologyRefreshIntegrationTests.java @@ -205,7 +205,7 @@ void adaptiveTopologyUpdateIsRateLimited() { } @Test - void adaptiveTopologyUpdatetUsesTimeout() { + void adaptiveTopologyUpdateUsesTimeout() { ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()// .adaptiveRefreshTriggersTimeout(500, TimeUnit.MILLISECONDS)//