Skip to content

Commit

Permalink
Polishing #2045
Browse files Browse the repository at this point in the history
Use thenApplyAsync(…) instead of supplyAsync(…) to reduce allocations. Adopt tests.

Original pull request: #2048.
  • Loading branch information
mp911de committed Mar 18, 2022
1 parent ae99229 commit 91fb351
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public CompletionStage<Map<RedisURI, Partitions>> loadViews(Iterable<RedisURI> s
Requests requestedTopology = connections.requestTopology(commandTimeoutNs, TimeUnit.NANOSECONDS);
Requests requestedInfo = connections.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS);
return CompletableFuture.allOf(requestedTopology.allCompleted(), requestedInfo.allCompleted())
.thenCompose(ignore -> getNodeSpecificViewsAsync(requestedTopology, requestedInfo))
.thenApplyAsync(ignore -> getNodeSpecificViews(requestedTopology, requestedInfo),
clientResources.eventExecutorGroup())
.thenCompose(views -> {
if (discovery && isEventLoopActive()) {

Expand All @@ -124,11 +125,12 @@ public CompletionStage<Map<RedisURI, Partitions>> loadViews(Iterable<RedisURI> s

Requests additionalTopology = newConnections
.requestTopology(commandTimeoutNs, TimeUnit.NANOSECONDS).mergeWith(requestedTopology);
Requests additionalClients = newConnections
.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS).mergeWith(requestedInfo);
Requests additionalInfo = newConnections.requestInfo(commandTimeoutNs, TimeUnit.NANOSECONDS)
.mergeWith(requestedInfo);
return CompletableFuture
.allOf(additionalTopology.allCompleted(), additionalClients.allCompleted())
.thenCompose(ignore2 -> getNodeSpecificViewsAsync(additionalTopology, additionalClients));
.allOf(additionalTopology.allCompleted(), additionalInfo.allCompleted())
.thenApplyAsync(ignore2 -> getNodeSpecificViews(additionalTopology, additionalInfo),
clientResources.eventExecutorGroup());
});
}

Expand Down Expand Up @@ -281,13 +283,6 @@ NodeTopologyViews getNodeSpecificViews(Requests requestedTopology, Requests requ
return new NodeTopologyViews(views);
}

private CompletableFuture<NodeTopologyViews> getNodeSpecificViewsAsync(Requests requestedTopology, Requests requestedInfo) {
// use computation thread pool
// ref: https://github.com/lettuce-io/lettuce-core/issues/2045
return CompletableFuture.supplyAsync(() -> getNodeSpecificViews(requestedTopology, requestedInfo),
clientResources.eventExecutorGroup());
}

private static boolean validNode(RedisClusterNode redisClusterNode) {

if (redisClusterNode.is(RedisClusterNode.NodeFlag.NOADDR)) {
Expand Down Expand Up @@ -376,7 +371,7 @@ private boolean isEventLoopActive() {

private static Set<RedisURI> difference(Set<RedisURI> allKnown, Set<RedisURI> seed) {

Set<RedisURI> result = new TreeSet<>(TopologyComparators.RedisURIComparator.INSTANCE);
Set<RedisURI> result = new TreeSet<>(TopologyComparators.RedisURIComparator.INSTANCE);

for (RedisURI e : allKnown) {
if (!seed.contains(e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class ClusterTopologyRefreshUnitTests {

private static final String NODE_1_VIEW = "1 127.0.0.1:7380 master,myself - 0 1401258245007 2 disconnected 8000-11999\n"
+ "2 127.0.0.1:7381 master - 111 1401258245007 222 connected 7000 12000 12002-16383\n";

private static final String NODE_2_VIEW = "1 127.0.0.1:7380 master - 0 1401258245007 2 disconnected 8000-11999\n"
+ "2 127.0.0.1:7381 master,myself - 111 1401258245007 222 connected 7000 12000 12002-16383\n";

Expand Down Expand Up @@ -114,6 +115,11 @@ void before() {
when(clientResources.timer()).thenReturn(timer);
when(clientResources.socketAddressResolver()).thenReturn(SocketAddressResolver.create(DnsResolver.unresolved()));
when(clientResources.eventExecutorGroup()).thenReturn(eventExecutors);
doAnswer(invocation -> {
((Runnable) invocation.getArgument(0)).run();
return null;
}).when(eventExecutors).execute(any(Runnable.class));

when(connection1.async()).thenReturn(asyncCommands1);
when(connection2.async()).thenReturn(asyncCommands2);
when(connection1.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
Expand Down Expand Up @@ -299,10 +305,10 @@ void shouldAttemptToConnectOnlyOnce() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
.thenReturn(completedWithException(new RedisException("connection failed")));
.thenReturn(completedWithException(new RedisException("connection failed")));

sut.loadViews(seed, Duration.ofSeconds(1), true);

Expand Down Expand Up @@ -346,10 +352,10 @@ void shouldFailIfNoNodeConnects() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedWithException(new RedisException("connection failed")));
.thenReturn(completedWithException(new RedisException("connection failed")));
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
.thenReturn(completedWithException(new RedisException("connection failed")));
.thenReturn(completedWithException(new RedisException("connection failed")));

try {
sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join();
Expand All @@ -373,10 +379,10 @@ void shouldShouldDiscoverNodes() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
.thenReturn(completedFuture((StatefulRedisConnection) connection2));

sut.loadViews(seed, Duration.ofSeconds(1), true);

Expand All @@ -393,7 +399,7 @@ void shouldShouldNotDiscoverNodes() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));

sut.loadViews(seed, Duration.ofSeconds(1), false);

Expand All @@ -410,11 +416,11 @@ void shouldNotFailOnDuplicateSeedNodes() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
.thenReturn(completedFuture((StatefulRedisConnection) connection2));

sut.loadViews(seed, Duration.ofSeconds(1), true);

Expand All @@ -431,10 +437,10 @@ void shouldCloseConnections() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
.thenReturn(completedFuture((StatefulRedisConnection) connection2));

sut.loadViews(seed, Duration.ofSeconds(1), true);

Expand All @@ -449,7 +455,7 @@ void undiscoveredAdditionalNodesShouldBeLastUsingClientCount() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));

Map<RedisURI, Partitions> partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), false).toCompletableFuture()
.join();
Expand All @@ -469,18 +475,18 @@ void discoveredAdditionalNodesShouldBeOrderedUsingClientCount() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
.thenReturn(completedFuture((StatefulRedisConnection) connection2));

Map<RedisURI, Partitions> partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join();

Partitions partitions = partitionsMap.values().iterator().next();

List<RedisClusterNode> nodes = TopologyComparators.sortByClientCount(partitions);

assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(RedisURI.create("127.0.0.1", 7381),
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).contains(RedisURI.create("127.0.0.1", 7381),
seed.get(0));
}

Expand All @@ -491,7 +497,7 @@ void undiscoveredAdditionalNodesShouldBeLastUsingLatency() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));

Map<RedisURI, Partitions> partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), false).toCompletableFuture()
.join();
Expand All @@ -511,18 +517,18 @@ void discoveredAdditionalNodesShouldBeOrderedUsingLatency() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
.thenReturn(completedFuture((StatefulRedisConnection) connection2));

Map<RedisURI, Partitions> partitionsMap = sut.loadViews(seed, Duration.ofSeconds(1), true).toCompletableFuture().join();

Partitions partitions = partitionsMap.values().iterator().next();

List<RedisClusterNode> nodes = TopologyComparators.sortByLatency(partitions);

assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).containsSequence(RedisURI.create("127.0.0.1", 7381),
assertThat(nodes).hasSize(2).extracting(RedisClusterNode::getUri).contains(RedisURI.create("127.0.0.1", 7381),
seed.get(0));
}

Expand All @@ -533,10 +539,10 @@ void shouldPropagateCommandFailures() {

when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7380))))
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
.thenReturn(completedFuture((StatefulRedisConnection) connection1));
when(nodeConnectionFactory.connectToNodeAsync(any(RedisCodec.class),
eq(InetSocketAddress.createUnresolved("127.0.0.1", 7381))))
.thenReturn(completedFuture((StatefulRedisConnection) connection2));
.thenReturn(completedFuture((StatefulRedisConnection) connection2));

reset(connection1, connection2);

Expand Down Expand Up @@ -618,4 +624,5 @@ private static <T> ConnectionFuture<T> completedWithException(Exception e) {

return ConnectionFuture.from(InetSocketAddress.createUnresolved(TestSettings.host(), TestSettings.port()), future);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ void adaptiveTopologyUpdateIsRateLimited() {
}

@Test
void adaptiveTopologyUpdatetUsesTimeout() {
void adaptiveTopologyUpdateUsesTimeout() {

ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()//
.adaptiveRefreshTriggersTimeout(500, TimeUnit.MILLISECONDS)//
Expand Down

0 comments on commit 91fb351

Please sign in to comment.