Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Adding command WAIT #1707

Merged
merged 12 commits into from
Jun 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ fn get_request_timeout(cmd: &Cmd, default_timeout: Duration) -> RedisResult<Opti
.position(b"BLOCK")
.map(|idx| get_timeout_from_cmd_arg(cmd, idx + 1, TimeUnit::Milliseconds))
.unwrap_or(Ok(RequestTimeoutOption::ClientConfig)),
b"WAIT" => get_timeout_from_cmd_arg(cmd, 2, TimeUnit::Milliseconds),
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
_ => Ok(RequestTimeoutOption::ClientConfig),
}?;

Expand Down Expand Up @@ -736,6 +737,17 @@ mod tests {
0.857 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);

let mut cmd = Cmd::new();
cmd.arg("WAIT").arg(1).arg("500");
let result = get_request_timeout(&cmd, Duration::from_millis(500));
assert!(result.is_ok());
assert_eq!(
result.unwrap(),
Some(Duration::from_secs_f64(
0.5 + BLOCKING_CMD_TIMEOUT_EXTENSION
))
);
}

#[test]
Expand Down
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ enum RequestType {
SScan = 200;
ZScan = 201;
HScan = 202;
Wait = 208;
}

message Command {
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ pub enum RequestType {
SScan = 200,
ZScan = 201,
HScan = 202,
Wait = 208,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -425,6 +426,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::SScan => RequestType::SScan,
ProtobufRequestType::ZScan => RequestType::ZScan,
ProtobufRequestType::HScan => RequestType::HScan,
ProtobufRequestType::Wait => RequestType::Wait,
}
}
}
Expand Down Expand Up @@ -637,6 +639,7 @@ impl RequestType {
RequestType::SScan => Some(cmd("SSCAN")),
RequestType::ZScan => Some(cmd("ZSCAN")),
RequestType::HScan => Some(cmd("HSCAN")),
RequestType::Wait => Some(cmd("WAIT")),
}
}
}
9 changes: 9 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
Expand Down Expand Up @@ -2950,4 +2951,12 @@ public CompletableFuture<Object[]> hscan(
String[] arguments = concatenateArrays(new String[] {key, cursor}, hScanOptions.toArgs());
return commandManager.submitNewCommand(HScan, arguments, this::handleArrayResponse);
}

@Override
public CompletableFuture<Long> wait(long numreplicas, long timeout) {
return commandManager.submitNewCommand(
Wait,
new String[] {Long.toString(numreplicas), Long.toString(timeout)},
this::handleLongResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1207,4 +1207,22 @@ CompletableFuture<String> restore(
* }</pre>
*/
CompletableFuture<Long> sortStore(String key, String destination);

/**
* Blocks the current client until all the previous write commands are successfully transferred
* and acknowledged by at least <code>numreplicas</code> of replicas. If <code>timeout</code> is
* reached, the command returns even if the specified number of replicas were not yet reached.
*
* @param numreplicas The number of replicas to reach.
* @param timeout The timeout value specified in milliseconds. A value of <code>0</code> will
* block indefinitely.
* @return The number of replicas reached by all the writes performed in the context of the
* current connection.
* @example
* <pre>{@code
* client.set("key", "value).get();
* assert client.wait(1L, 1000L).get() == 1L;
* }</pre>
*/
CompletableFuture<Long> wait(long numreplicas, long timeout);
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
}
17 changes: 17 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
Expand Down Expand Up @@ -5623,6 +5624,22 @@ public T hscan(@NonNull String key, @NonNull String cursor, @NonNull HScanOption
return getThis();
}

/**
* Returns the number of replicas that acknowledged the write commands sent by the current client
* before this command, both in the case where the specified number of replicas are reached, or
* when the timeout is reached.
*
* @param numreplicas The number of replicas to reach.
* @param timeout The timeout value specified in milliseconds.
* @return Command Response - The number of replicas reached by all the writes performed in the
* context of the current connection.
*/
public T wait(long numreplicas, long timeout) {
String[] args = buildArgs(Long.toString(numreplicas), Long.toString(timeout));
protobufTransaction.addCommands(buildCommand(Wait, args));
return getThis();
}

/** Build protobuf {@link Command} object for given command and arguments. */
protected Command buildCommand(RequestType requestType) {
// An empty args array is still needed for parameter-less commands.
Expand Down
25 changes: 25 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
Expand Down Expand Up @@ -9002,6 +9003,30 @@ public void sortStore_with_options_returns_success() {
assertEquals(result, payload);
}

@SneakyThrows
@Test
public void wait_returns_success() {
// setup
long numreplicas = 1L;
long timeout = 1000L;
Long result = 5L;
String[] args = new String[] {"1", "1000"};

CompletableFuture<Long> testResponse = new CompletableFuture<>();
testResponse.complete(result);

// match on protobuf request
when(commandManager.<Long>submitNewCommand(eq(Wait), eq(args), any())).thenReturn(testResponse);

// exercise
CompletableFuture<Long> response = service.wait(numreplicas, timeout);
Long payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(result, payload);
}

@SneakyThrows
@Test
public void sscan_with_options_returns_success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
Expand Down Expand Up @@ -1387,6 +1388,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
HScanOptions.COUNT_OPTION_STRING,
"10")));

transaction.wait(1L, 1000L);
results.add(Pair.of(Wait, buildArgs("1", "1000")));

var protobufTransaction = transaction.getProtobufTransaction().build();

for (int idx = 0; idx < protobufTransaction.getCommandsCount(); idx++) {
Expand Down
43 changes: 43 additions & 0 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -8052,4 +8052,47 @@ public void hscan(BaseClient client) {
() -> client.hscan(key1, "-1", HScanOptions.builder().count(-1L).build()).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void waitTest(BaseClient client) {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

// assert that wait returns 0 under standalone and 1 under cluster mode.
assertEquals(OK, client.set(key, "value").get());
assertTrue(client.wait(numreplicas, timeout).get() >= (client instanceof RedisClient ? 0 : 1));
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved

// command should fail on a negative timeout value
ExecutionException executionException =
assertThrows(ExecutionException.class, () -> client.wait(1L, -1L).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void wait_timeout_check(BaseClient client) {
String key = UUID.randomUUID().toString();
// create new client with default request timeout (250 millis)
try (var testClient =
client instanceof RedisClient
? RedisClient.CreateClient(commonClientConfig().build()).get()
: RedisClusterClient.CreateClient(commonClusterClientConfig().build()).get()) {

// ensure that commands do not time out, even if timeout > request timeout
assertEquals(OK, testClient.set(key, "value").get());
assertEquals((client instanceof RedisClient ? 0 : 1), testClient.wait(1L, 1000L).get());
tjzhang-BQ marked this conversation as resolved.
Show resolved Hide resolved

// with 0 timeout (no timeout) wait should block indefinitely,
// but we wrap the test with timeout to avoid test failing or being stuck forever
assertEquals(OK, testClient.set(key, "value2").get());
assertThrows(
tjzhang-BQ marked this conversation as resolved.
Show resolved Hide resolved
TimeoutException.class, // <- future timeout, not command timeout
() -> testClient.wait(100L, 0L).get(1000, TimeUnit.MILLISECONDS));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,4 +308,24 @@ public void sort() {

assertDeepEquals(expectedResult, results);
}

@SneakyThrows
@Test
public void waitTest() {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;
ClusterTransaction transaction = new ClusterTransaction();

transaction.set(key, "value").wait(numreplicas, timeout);
Object[] results = clusterClient.exec(transaction).get();
Object[] expectedResult =
new Object[] {
OK, // set(key, "value")
0L, // wait(numreplicas, timeout)
};
assertEquals(expectedResult[0], results[0]);
assertTrue((Long) expectedResult[1] <= (Long) results[1]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,26 @@ public void sort_and_sortReadOnly() {
assertArrayEquals(expectedResults, client.exec(transaction2).get());
}
}

@SneakyThrows
@Test
public void waitTest() {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;
Transaction transaction = new Transaction();

transaction.set(key, "value");
transaction.wait(numreplicas, timeout);

Object[] results = client.exec(transaction).get();
Object[] expectedResult =
new Object[] {
OK, // set(key, "value")
0L, // wait(numreplicas, timeout)
};
assertEquals(expectedResult[0], results[0]);
assertTrue((long) expectedResult[1] <= (long) results[1]);
}
}
Loading