Skip to content

Commit

Permalink
Added inflightRequestsLimit client config to java (valkey-io#2408)
Browse files Browse the repository at this point in the history
* Add inflight request limit config to java

Signed-off-by: GilboaAWS <gilboabg@amazon.com>
  • Loading branch information
GilboaAWS authored Oct 10, 2024
1 parent 1ce22c6 commit d61ed73
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,12 @@ public abstract class BaseClientConfiguration {
private final ThreadPoolResource threadPoolResource;

public abstract BaseSubscriptionConfiguration getSubscriptionConfiguration();

/**
* The maximum number of concurrent requests allowed to be in-flight (sent but not yet completed).
* This limit is used to control the memory usage and prevent the client from overwhelming the
* server or getting stuck in case of a queue backlog. If not set, a default value of 1000 will be
* used.
*/
private final Integer inflightRequestsLimit;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* .databaseId(1)
* .clientName("GLIDE")
* .subscriptionConfiguration(subscriptionConfiguration)
* .inflightRequestsLimit(1000)
* .build();
* }</pre>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* .requestTimeout(2000)
* .clientName("GLIDE")
* .subscriptionConfiguration(subscriptionConfiguration)
* .inflightRequestsLimit(1000)
* .build();
* }</pre>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ private ConnectionRequest.Builder setupConnectionRequestBuilderBaseConfiguration
connectionRequestBuilder.setClientName(configuration.getClientName());
}

if (configuration.getInflightRequestsLimit() != null) {
connectionRequestBuilder.setInflightRequestsLimit(configuration.getInflightRequestsLimit());
}

return connectionRequestBuilder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class ConnectionManagerTest {

private static final String CLIENT_NAME = "ClientName";

private static final int INFLIGHT_REQUESTS_LIMIT = 1000;

@BeforeEach
public void setUp() {
channel = mock(ChannelHandler.class);
Expand Down Expand Up @@ -149,6 +151,7 @@ public void connection_request_protobuf_generation_with_all_fields_set() {
.subscription(EXACT, gs("channel_2"))
.subscription(PATTERN, gs("*chatRoom*"))
.build())
.inflightRequestsLimit(INFLIGHT_REQUESTS_LIMIT)
.build();
ConnectionRequest expectedProtobufConnectionRequest =
ConnectionRequest.newBuilder()
Expand Down Expand Up @@ -193,6 +196,7 @@ public void connection_request_protobuf_generation_with_all_fields_set() {
ByteString.copyFrom(gs("*chatRoom*").getBytes()))
.build()))
.build())
.setInflightRequestsLimit(INFLIGHT_REQUESTS_LIMIT)
.build();
CompletableFuture<Response> completedFuture = new CompletableFuture<>();
Response response = Response.newBuilder().setConstantResponse(ConstantResponse.OK).build();
Expand Down
60 changes: 60 additions & 0 deletions java/integTest/src/test/java/glide/SharedClientTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@
import static glide.TestUtilities.getRandomString;
import static glide.api.BaseClient.OK;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import glide.api.BaseClient;
import glide.api.GlideClient;
import glide.api.GlideClusterClient;
import glide.api.models.exceptions.RequestException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import lombok.Getter;
import lombok.SneakyThrows;
import net.bytebuddy.utility.RandomString;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -111,4 +118,57 @@ public void client_can_handle_concurrent_workload(BaseClient client, int valueSi

executorService.shutdown();
}

private static Stream<Arguments> inflightRequestsLimitSizeAndClusterMode() {
return Stream.of(
Arguments.of(false, 5),
Arguments.of(false, 100),
Arguments.of(false, 1000),
Arguments.of(true, 5),
Arguments.of(true, 100),
Arguments.of(true, 1000));
}

@SneakyThrows
@ParameterizedTest()
@MethodSource("inflightRequestsLimitSizeAndClusterMode")
public void inflight_requests_limit(boolean clusterMode, int inflightRequestsLimit) {
BaseClient testClient;
String keyName = "nonexistkeylist" + RandomString.make(4);

if (clusterMode) {
testClient =
GlideClient.createClient(
commonClientConfig().inflightRequestsLimit(inflightRequestsLimit).build())
.get();
} else {
testClient =
GlideClusterClient.createClient(
commonClusterClientConfig().inflightRequestsLimit(inflightRequestsLimit).build())
.get();
}

// exercise
List<CompletableFuture<String[]>> responses = new ArrayList<>();
for (int i = 0; i < inflightRequestsLimit + 1; i++) {
responses.add(testClient.blpop(new String[] {keyName}, 0));
}

// verify
// Check that all requests except the last one are still pending
for (int i = 0; i < inflightRequestsLimit; i++) {
assertFalse(responses.get(i).isDone(), "Request " + i + " should still be pending");
}

// The last request should complete exceptionally
try {
responses.get(inflightRequestsLimit).get(100, TimeUnit.MILLISECONDS);
fail("Expected the last request to throw an exception");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof RequestException);
assertTrue(e.getCause().getMessage().contains("maximum inflight requests"));
}

testClient.close();
}
}

0 comments on commit d61ed73

Please sign in to comment.