Skip to content

Commit

Permalink
Java: Adding command WAIT
Browse files Browse the repository at this point in the history
Java: Adding command WAIT
  • Loading branch information
tjzhang-BQ authored and TJ Zhang committed Jun 29, 2024
1 parent f53ad16 commit e9f4a1c
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 0 deletions.
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ enum RequestType {
FunctionRestore = 197;
XPending = 198;
XGroupSetId = 199;
Wait = 200;
}

message Command {
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ pub enum RequestType {
FunctionRestore = 197,
XPending = 198,
XGroupSetId = 199,
Wait = 200,
}

fn get_two_word_command(first: &str, second: &str) -> Cmd {
Expand Down Expand Up @@ -419,6 +420,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::FunctionRestore => RequestType::FunctionRestore,
ProtobufRequestType::XPending => RequestType::XPending,
ProtobufRequestType::XGroupSetId => RequestType::XGroupSetId,
ProtobufRequestType::Wait => RequestType::Wait,
}
}
}
Expand Down Expand Up @@ -628,6 +630,7 @@ impl RequestType {
RequestType::FunctionRestore => Some(get_two_word_command("FUNCTION", "RESTORE")),
RequestType::XPending => Some(cmd("XPENDING")),
RequestType::XGroupSetId => Some(get_two_word_command("XGROUP", "SETID")),
RequestType::Wait => Some(cmd("WAIT")),
}
}
}
9 changes: 9 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
Expand Down Expand Up @@ -2905,4 +2906,12 @@ public CompletableFuture<Long> geosearchstore(
resultOptions.toArgs());
return commandManager.submitNewCommand(GeoSearchStore, arguments, this::handleLongResponse);
}

@Override
public CompletableFuture<Long> wait(long numreplicas, long timeout) {
return commandManager.submitNewCommand(
Wait,
new String[] {Long.toString(numreplicas), Long.toString(timeout)},
this::handleLongResponse);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1207,4 +1207,21 @@ CompletableFuture<String> restore(
* }</pre>
*/
CompletableFuture<Long> sortStore(String key, String destination);

/**
* Blocks the current client until all the previous write commands are successfully transferred
* and acknowledged by at least <code>numreplicas</code> of replicas. If <code>timeout</code> is
* reached, the command returns even if the specified number of replicas were not yet reached.
*
* @param numreplicas The number of replicas to reach.
* @param timeout The timeout value specified in milliseconds.
* @return The number of replicas reached by all the writes performed in the context of the
* current connection.
* @example
* <pre>{@code
* client.set("key", "value).get();
* assert client.wait(1L, 1000L).get() == 1L;
* }</pre>
*/
CompletableFuture<Long> wait(long numreplicas, long timeout);
}
17 changes: 17 additions & 0 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
Expand Down Expand Up @@ -5500,6 +5501,22 @@ public T geosearchstore(
return getThis();
}

/**
* Blocks the current client until all the previous write commands are successfully transferred
* and acknowledged by at least <code>numreplicas</code> of replicas. If <code>timeout</code> is
* reached, the command returns even if the specified number of replicas were not yet reached.
*
* @param numreplicas The number of replicas to reach.
* @param timeout The timeout value specified in milliseconds.
* @return Command Response - The number of replicas reached by all the writes performed in the
* context of the current connection.
*/
public T wait(long numreplicas, long timeout) {
ArgsArray args = buildArgs(Long.toString(numreplicas), Long.toString(timeout));
protobufTransaction.addCommands(buildCommand(Wait, args));
return getThis();
}

/** Build protobuf {@link Command} object for given command and arguments. */
protected Command buildCommand(RequestType requestType) {
return buildCommand(requestType, buildArgs());
Expand Down
25 changes: 25 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
Expand Down Expand Up @@ -8969,6 +8970,30 @@ public void sortStore_with_options_returns_success() {
assertEquals(result, payload);
}

@SneakyThrows
@Test
public void wait_returns_success() {
// setup
long numreplicas = 1L;
long timeout = 1000L;
Long result = 5L;
String[] args = new String[] {"1", "1000"};

CompletableFuture<Long> testResponse = new CompletableFuture<>();
testResponse.complete(result);

// match on protobuf request
when(commandManager.<Long>submitNewCommand(eq(Wait), eq(args), any())).thenReturn(testResponse);

// exercise
CompletableFuture<Long> response = service.wait(numreplicas, timeout);
Long payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(result, payload);
}

private static List<Arguments> getGeoSearchArguments() {
return List.of(
Arguments.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Wait;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
Expand Down Expand Up @@ -1336,6 +1337,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
"ANY",
"ASC")));

transaction.wait(1L, 1000L);
results.add(Pair.of(Wait, buildArgs("1", "1000")));

var protobufTransaction = transaction.getProtobufTransaction().build();

for (int idx = 0; idx < protobufTransaction.getCommandsCount(); idx++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,24 @@ public void sort() {

assertDeepEquals(expectedResult, results);
}

@SneakyThrows
@Test
public void waitTest() {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;
ClusterTransaction transaction = new ClusterTransaction();

transaction.set(key, "value").wait(numreplicas, timeout);
Object[] results = clusterClient.exec(transaction).get();
Object[] expectedResult =
new Object[] {
OK, // set(key, "value")
0L, // wait(numreplicas, timeout)
};
assertEquals(expectedResult[0], results[0]);
assertTrue((Long) expectedResult[1] <= (Long) results[1]);
}
}
17 changes: 17 additions & 0 deletions java/integTest/src/test/java/glide/cluster/CommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1846,4 +1846,21 @@ public void sort() {
.get());
assertArrayEquals(key2DescendingListSubset, clusterClient.lrange(key3, 0, -1).get());
}

@SneakyThrows
@Test
public void waitTest() {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;

assertEquals(OK, clusterClient.set(key, "value").get());
assertTrue(clusterClient.wait(numreplicas, timeout).get() >= 1);

// command should fail on a negative timeout value
ExecutionException executionException =
assertThrows(ExecutionException.class, () -> clusterClient.wait(1L, -1L).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}
}
17 changes: 17 additions & 0 deletions java/integTest/src/test/java/glide/standalone/CommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1032,4 +1032,21 @@ public void sort() {
.get());
assertArrayEquals(namesSortedByAge, regularClient.lrange(storeKey, 0, -1).get());
}

@SneakyThrows
@Test
public void waitTest() {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;

assertEquals(OK, regularClient.set(key, "value").get());
assertTrue(regularClient.wait(numreplicas, timeout).get() >= 0);

// command should fail on a negative timeout value
ExecutionException executionException =
assertThrows(ExecutionException.class, () -> regularClient.wait(1L, -1L).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,26 @@ public void sort_and_sortReadOnly() {
assertArrayEquals(expectedResults, client.exec(transaction2).get());
}
}

@SneakyThrows
@Test
public void waitTest() {
// setup
String key = UUID.randomUUID().toString();
long numreplicas = 1L;
long timeout = 1000L;
Transaction transaction = new Transaction();

transaction.set(key, "value");
transaction.wait(numreplicas, timeout);

Object[] results = client.exec(transaction).get();
Object[] expectedResult =
new Object[] {
OK, // set(key, "value")
0L, // wait(numreplicas, timeout)
};
assertEquals(expectedResult[0], results[0]);
assertTrue((long) expectedResult[1] <= (long) results[1]);
}
}

0 comments on commit e9f4a1c

Please sign in to comment.