From 420dd1c9c7ec73cfd82daec65a8294edabad5ef8 Mon Sep 17 00:00:00 2001 From: Jack Cheng Date: Sat, 4 Jan 2025 11:07:45 -0800 Subject: [PATCH] renamed to max connection pools and updated unit test. --- .../netty/resources/ConnectionProvider.java | 16 +-- .../resources/PooledConnectionProvider.java | 8 +- .../netty/http/client/HttpClientTest.java | 124 ++++++++---------- 3 files changed, 65 insertions(+), 83 deletions(-) diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java index cdbd0c0f98..12e7e4c750 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/ConnectionProvider.java @@ -394,9 +394,9 @@ interface AllocationStrategy> { final class Builder extends ConnectionPoolSpec { static final Duration DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED = Duration.ZERO; - static final int EXPECTED_CONNECTION_POOLS_DISABLED = -1; + static final int MAX_CONNECTION_POOLS = -1; - int expectedConnectionPools = EXPECTED_CONNECTION_POOLS_DISABLED; + int maxConnectionPools = MAX_CONNECTION_POOLS; String name; Duration inactivePoolDisposeInterval = DISPOSE_INACTIVE_POOLS_IN_BACKGROUND_DISABLED; @@ -420,7 +420,7 @@ private Builder(String name) { this.inactivePoolDisposeInterval = copy.inactivePoolDisposeInterval; this.poolInactivity = copy.poolInactivity; this.disposeTimeout = copy.disposeTimeout; - this.expectedConnectionPools = copy.expectedConnectionPools; + this.maxConnectionPools = copy.maxConnectionPools; copy.confPerRemoteHost.forEach((address, spec) -> this.confPerRemoteHost.put(address, new ConnectionPoolSpec<>(spec))); } @@ -493,15 +493,15 @@ public final Builder forRemoteHost(SocketAddress remoteHost, Consumer implements final Duration inactivePoolDisposeInterval; final Duration poolInactivity; final Duration disposeTimeout; - final int expectedConnectionPools; + final int maxConnectionPools; final AtomicInteger connectionPoolCount = new AtomicInteger(0); final Map maxConnections = new HashMap<>(); Mono onDispose; @@ -108,7 +108,7 @@ protected PooledConnectionProvider(Builder builder) { this.inactivePoolDisposeInterval = builder.inactivePoolDisposeInterval; this.poolInactivity = builder.poolInactivity; this.disposeTimeout = builder.disposeTimeout; - this.expectedConnectionPools = builder.expectedConnectionPools; + this.maxConnectionPools = builder.maxConnectionPools; this.defaultPoolFactory = new PoolFactory<>(builder, builder.disposeTimeout, clock); for (Map.Entry> entry : builder.confPerRemoteHost.entrySet()) { poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory<>(entry.getValue(), builder.disposeTimeout)); @@ -136,10 +136,10 @@ public final Mono acquire( log.debug("Creating a new [{}] client pool [{}] for [{}]", name, poolFactory, remoteAddress); } - if (expectedConnectionPools > Builder.EXPECTED_CONNECTION_POOLS_DISABLED && connectionPoolCount.incrementAndGet() > expectedConnectionPools) { + if (maxConnectionPools > Builder.MAX_CONNECTION_POOLS && connectionPoolCount.incrementAndGet() > maxConnectionPools) { if (log.isWarnEnabled()) { log.warn("Connection pool creation limit exceeded: {} pools created, maximum expected is {}", connectionPoolCount.get(), - expectedConnectionPools); + maxConnectionPools); } } diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java index 7d8ca2911f..03d19902e1 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/HttpClientTest.java @@ -618,83 +618,65 @@ void sslExchangeRelativeGet() throws SSLException { assertThat(responseString).isEqualTo("hello /foo"); } - @Test - void expectedConnectionPoolsEnabled() throws SSLException { - ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); - Logger spyLogger = Mockito.spy(log); - Loggers.useCustomLoggers(s -> spyLogger); - - SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) - .build(); - - - disposableServer = - createServer() - .secure(ssl -> ssl.sslContext(sslServer)) - .handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri()))) - .bindNow(); - - ConnectionProvider connectionProvider = ConnectionProvider.builder("expected-connection-pool").expectedConnectionPools(1).build(); - - StepVerifier.create( - Flux.range(1, 2) - .flatMap(i -> createClient(connectionProvider, disposableServer::address) - .secure(ssl -> ssl.sslContext(createClientSslContext())) - .get() - .uri("/foo") - .responseContent() - .aggregate() - .asString())) - .thenConsumeWhile(s -> true) - .verifyComplete(); - - Loggers.resetLoggerFactory(); - - - Mockito.verify(spyLogger).warn(argumentCaptor.capture(), Mockito.eq(2), Mockito.eq(1)); - assertThat(argumentCaptor.getValue()).isEqualTo("Connection pool creation limit exceeded: {} pools created, maximum expected is {}"); - - connectionProvider.dispose(); - disposableServer.dispose(); - - } - - @Test - void expectedConnectionPoolsNotEnabled() throws SSLException { - Logger spyLogger = Mockito.spy(log); - Loggers.useCustomLoggers(s -> spyLogger); - - SslContext sslServer = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) - .build(); - - - disposableServer = - createServer() - .secure(ssl -> ssl.sslContext(sslServer)) - .handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri()))) - .bindNow(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void maxConnectionPools(boolean withMaxConnectionPools) throws SSLException { - ConnectionProvider connectionProvider = ConnectionProvider.builder("max-connection-pools").build(); + try { + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(String.class); + Logger spyLogger = Mockito.spy(log); + Loggers.useCustomLoggers(s -> spyLogger); - StepVerifier.create( - Flux.range(1, 2) - .flatMap(i -> createClient(connectionProvider, disposableServer::address) - .secure(ssl -> ssl.sslContext(createClientSslContext())) - .get() - .uri("/foo") - .responseContent() - .aggregate() - .asString())) - .thenConsumeWhile(s -> true) - .verifyComplete(); + SslContext sslServer = SslContextBuilder + .forServer(ssc.certificate(), ssc.privateKey()) + .build(); - Loggers.resetLoggerFactory(); + disposableServer = createServer() + .secure(ssl -> ssl.sslContext(sslServer)) + .handle((req, resp) -> resp.sendString(Flux.just("hello ", req.uri()))) + .bindNow(); + ConnectionProvider connectionProvider = withMaxConnectionPools ? ConnectionProvider + .builder("max-connection-pools") + .maxConnectionPools(1) + .build() : ConnectionProvider + .builder("max-connection-pools") + .build(); - Mockito.verify(spyLogger, times(0)).warn(Mockito.eq("Connection pool creation limit exceeded: {} pools created, maximum expected is {}"), Mockito.eq(2), Mockito.eq(1)); + StepVerifier + .create(Flux + .range(1, 2) + .flatMap(i -> createClient(connectionProvider, disposableServer::address) + .secure(ssl -> ssl.sslContext(createClientSslContext())) + .get() + .uri("/foo") + .responseContent() + .aggregate() + .asString())) + .thenConsumeWhile(s -> true) + .verifyComplete(); + + if (withMaxConnectionPools) { + Mockito + .verify(spyLogger) + .warn(argumentCaptor.capture(), Mockito.eq(2), Mockito.eq(1)); + assertThat(argumentCaptor.getValue()).isEqualTo( + "Connection pool creation limit exceeded: {} pools created, maximum expected is {}"); + } + else { + Mockito + .verify(spyLogger, times(0)) + .warn(Mockito.eq( + "Connection pool creation limit exceeded: {} pools created, maximum expected is {}"), + Mockito.eq(2), + Mockito.eq(1)); - connectionProvider.dispose(); - disposableServer.dispose(); + } + } + finally { + disposableServer.dispose(); + Loggers.resetLoggerFactory(); + } }