From 088cb91e2d5c24f025fd0c3f1639c69961a4b5fd Mon Sep 17 00:00:00 2001 From: Gilboab <97948000+GilboaAWS@users.noreply.github.com> Date: Thu, 10 Oct 2024 13:32:43 +0300 Subject: [PATCH] Added inflightRequestsLimit client config to java (#2408) * Add inflight request limit config to java Signed-off-by: GilboaAWS --- .../BaseClientConfiguration.java | 8 +++ .../GlideClientConfiguration.java | 1 + .../GlideClusterClientConfiguration.java | 1 + .../glide/managers/ConnectionManager.java | 4 ++ .../glide/managers/ConnectionManagerTest.java | 4 ++ .../test/java/glide/SharedClientTests.java | 60 +++++++++++++++++++ 6 files changed, 78 insertions(+) diff --git a/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java index b6cc4e26ff..e0a4ed5500 100644 --- a/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java @@ -66,4 +66,12 @@ public abstract class BaseClientConfiguration { private final ThreadPoolResource threadPoolResource; public abstract BaseSubscriptionConfiguration getSubscriptionConfiguration(); + + /** + * The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed). + * This limit is used to control the memory usage and prevent the client from overwhelming the + * server or getting stuck in case of a queue backlog. If not set, a default value of 1000 will be + * used. + */ + private final Integer inflightRequestsLimit; } diff --git a/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java index edb7bbb326..83d84e7c1f 100644 --- a/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/GlideClientConfiguration.java @@ -23,6 +23,7 @@ * .databaseId(1) * .clientName("GLIDE") * .subscriptionConfiguration(subscriptionConfiguration) + * .inflightRequestsLimit(1000) * .build(); * } */ diff --git a/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java index 2e49e7b66d..b1d1c7590c 100644 --- a/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/GlideClusterClientConfiguration.java @@ -22,6 +22,7 @@ * .requestTimeout(2000) * .clientName("GLIDE") * .subscriptionConfiguration(subscriptionConfiguration) + * .inflightRequestsLimit(1000) * .build(); * } */ diff --git a/java/client/src/main/java/glide/managers/ConnectionManager.java b/java/client/src/main/java/glide/managers/ConnectionManager.java index 58328d375c..a5a8b9c5c3 100644 --- a/java/client/src/main/java/glide/managers/ConnectionManager.java +++ b/java/client/src/main/java/glide/managers/ConnectionManager.java @@ -118,6 +118,10 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderBaseConfiguration connectionRequestBuilder.setClientName(configuration.getClientName()); } + if (configuration.getInflightRequestsLimit() != null) { + connectionRequestBuilder.setInflightRequestsLimit(configuration.getInflightRequestsLimit()); + } + return connectionRequestBuilder; } diff --git a/java/client/src/test/java/glide/managers/ConnectionManagerTest.java b/java/client/src/test/java/glide/managers/ConnectionManagerTest.java index 9a3ebe6e19..7a8f1a0d44 100644 --- a/java/client/src/test/java/glide/managers/ConnectionManagerTest.java +++ b/java/client/src/test/java/glide/managers/ConnectionManagerTest.java @@ -64,6 +64,8 @@ public class ConnectionManagerTest { private static final String CLIENT_NAME = "ClientName"; + private static final int INFLIGHT_REQUESTS_LIMIT = 1000; + @BeforeEach public void setUp() { channel = mock(ChannelHandler.class); @@ -149,6 +151,7 @@ public void connection_request_protobuf_generation_with_all_fields_set() { .subscription(EXACT, gs("channel_2")) .subscription(PATTERN, gs("*chatRoom*")) .build()) + .inflightRequestsLimit(INFLIGHT_REQUESTS_LIMIT) .build(); ConnectionRequest expectedProtobufConnectionRequest = ConnectionRequest.newBuilder() @@ -193,6 +196,7 @@ public void connection_request_protobuf_generation_with_all_fields_set() { ByteString.copyFrom(gs("*chatRoom*").getBytes())) .build())) .build()) + .setInflightRequestsLimit(INFLIGHT_REQUESTS_LIMIT) .build(); CompletableFuture completedFuture = new CompletableFuture<>(); Response response = Response.newBuilder().setConstantResponse(ConstantResponse.OK).build(); diff --git a/java/integTest/src/test/java/glide/SharedClientTests.java b/java/integTest/src/test/java/glide/SharedClientTests.java index bf106f1ff4..3650c079e3 100644 --- a/java/integTest/src/test/java/glide/SharedClientTests.java +++ b/java/integTest/src/test/java/glide/SharedClientTests.java @@ -6,18 +6,25 @@ import static glide.TestUtilities.getRandomString; import static glide.api.BaseClient.OK; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import glide.api.BaseClient; import glide.api.GlideClient; import glide.api.GlideClusterClient; +import glide.api.models.exceptions.RequestException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import lombok.Getter; import lombok.SneakyThrows; +import net.bytebuddy.utility.RandomString; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Timeout; @@ -111,4 +118,57 @@ public void client_can_handle_concurrent_workload(BaseClient client, int valueSi executorService.shutdown(); } + + private static Stream inflightRequestsLimitSizeAndClusterMode() { + return Stream.of( + Arguments.of(false, 5), + Arguments.of(false, 100), + Arguments.of(false, 1000), + Arguments.of(true, 5), + Arguments.of(true, 100), + Arguments.of(true, 1000)); + } + + @SneakyThrows + @ParameterizedTest() + @MethodSource("inflightRequestsLimitSizeAndClusterMode") + public void inflight_requests_limit(boolean clusterMode, int inflightRequestsLimit) { + BaseClient testClient; + String keyName = "nonexistkeylist" + RandomString.make(4); + + if (clusterMode) { + testClient = + GlideClient.createClient( + commonClientConfig().inflightRequestsLimit(inflightRequestsLimit).build()) + .get(); + } else { + testClient = + GlideClusterClient.createClient( + commonClusterClientConfig().inflightRequestsLimit(inflightRequestsLimit).build()) + .get(); + } + + // exercise + List> responses = new ArrayList<>(); + for (int i = 0; i < inflightRequestsLimit + 1; i++) { + responses.add(testClient.blpop(new String[] {keyName}, 0)); + } + + // verify + // Check that all requests except the last one are still pending + for (int i = 0; i < inflightRequestsLimit; i++) { + assertFalse(responses.get(i).isDone(), "Request " + i + " should still be pending"); + } + + // The last request should complete exceptionally + try { + responses.get(inflightRequestsLimit).get(100, TimeUnit.MILLISECONDS); + fail("Expected the last request to throw an exception"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof RequestException); + assertTrue(e.getCause().getMessage().contains("maximum inflight requests")); + } + + testClient.close(); + } }