Skip to content

Commit

Permalink
Java: Add XGROUP SETID command (valkey-io#1720)
Browse files Browse the repository at this point in the history
* Initial implementation of XGroupSetId

* Unit tests

* Add integration tests

* PR feedback

* Address PR comments

doc updates

* Add 7.0.0 transaction integration test
  • Loading branch information
jduo committed Jun 30, 2024
1 parent f6fb3d0 commit 9271f2c
Show file tree
Hide file tree
Showing 8 changed files with 302 additions and 25 deletions.
26 changes: 22 additions & 4 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -1973,18 +1974,18 @@ public CompletableFuture<Map<String, String[][]>> xrevrange(

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull String key, @NonNull String groupname, @NonNull String id) {
@NonNull String key, @NonNull String groupName, @NonNull String id) {
return commandManager.submitNewCommand(
XGroupCreate, new String[] {key, groupname, id}, this::handleStringResponse);
XGroupCreate, new String[] {key, groupName, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull String key,
@NonNull String groupname,
@NonNull String groupName,
@NonNull String id,
@NonNull StreamGroupOptions options) {
String[] arguments = concatenateArrays(new String[] {key, groupname, id}, options.toArgs());
String[] arguments = concatenateArrays(new String[] {key, groupName, id}, options.toArgs());
return commandManager.submitNewCommand(XGroupCreate, arguments, this::handleStringResponse);
}

Expand All @@ -2008,6 +2009,23 @@ public CompletableFuture<Long> xgroupDelConsumer(
XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse);
}

@Override
public CompletableFuture<String> xgroupSetId(
@NonNull String key, @NonNull String groupName, @NonNull String id) {
return commandManager.submitNewCommand(
XGroupSetId, new String[] {key, groupName, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupSetId(
@NonNull String key,
@NonNull String groupName,
@NonNull String id,
@NonNull String entriesReadId) {
String[] arguments = new String[] {key, groupName, id, "ENTRIESREAD", entriesReadId};
return commandManager.submitNewCommand(XGroupSetId, arguments, this::handleStringResponse);
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
@NonNull Map<String, String> keysAndIds, @NonNull String group, @NonNull String consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ CompletableFuture<Map<String, String[][]>> xrevrange(
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param groupName The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
Expand All @@ -391,7 +391,7 @@ CompletableFuture<Map<String, String[][]>> xrevrange(
* }</pre>
*/
CompletableFuture<String> xgroupCreate(
String key, String groupname, String id, StreamGroupOptions options);
String key, String groupName, String id, StreamGroupOptions options);

/**
* Destroys the consumer group <code>groupname</code> for the stream stored at <code>key</code>.
Expand Down Expand Up @@ -443,6 +443,45 @@ CompletableFuture<String> xgroupCreate(
*/
CompletableFuture<Long> xgroupDelConsumer(String key, String group, String consumer);

/**
* Sets the last delivered ID for a consumer group.
*
* @see <a href="https://valkey.io/commands/xgroup-setid/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @param id The stream entry ID that should be set as the last delivered ID for the consumer
* group.
* @return <code>OK</code>.
* @example
* <pre>{@code
* // Update consumer group "mygroup", to set the last delivered entry ID.
* assert client.xgroupSetId("mystream", "mygroup", "0").get().equals("OK");
* }</pre>
*/
CompletableFuture<String> xgroupSetId(String key, String groupName, String id);

/**
* Sets the last delivered ID for a consumer group.
*
* @since Redis 7.0 and above
* @see <a href="https://valkey.io/commands/xgroup-setid/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @param id The stream entry ID that should be set as the last delivered ID for the consumer
* group.
* @param entriesReadId An arbitrary ID (that isn't the first ID, last ID, or the zero ID (<code>
* "0-0"</code>)) used to find out how many entries are between the arbitrary ID (excluding
* it) and the stream's last entry.
* @return <code>OK</code>.
* @example
* <pre>{@code
* // Update consumer group "mygroup", to set the last delivered entry ID.
* assert client.xgroupSetId("mystream", "mygroup", "0", "1-1").get().equals("OK");
* }</pre>
*/
CompletableFuture<String> xgroupSetId(
String key, String groupName, String id, String entriesReadId);

/**
* Reads entries from the given streams owned by a consumer group.
*
Expand Down
52 changes: 46 additions & 6 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -3024,14 +3025,14 @@ public T xrevrange(
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param groupName The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
* @return Command Response - <code>OK</code>.
*/
public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull String id) {
protobufTransaction.addCommands(buildCommand(XGroupCreate, buildArgs(key, groupname, id)));
public T xgroupCreate(@NonNull String key, @NonNull String groupName, @NonNull String id) {
protobufTransaction.addCommands(buildCommand(XGroupCreate, buildArgs(key, groupName, id)));
return getThis();
}

Expand All @@ -3041,7 +3042,7 @@ public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull S
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param groupName The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
Expand All @@ -3050,11 +3051,11 @@ public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull S
*/
public T xgroupCreate(
@NonNull String key,
@NonNull String groupname,
@NonNull String groupName,
@NonNull String id,
@NonNull StreamGroupOptions options) {
String[] commandArgs =
buildArgs(concatenateArrays(new String[] {key, groupname, id}, options.toArgs()));
buildArgs(concatenateArrays(new String[] {key, groupName, id}, options.toArgs()));
protobufTransaction.addCommands(buildCommand(XGroupCreate, commandArgs));
return getThis();
}
Expand Down Expand Up @@ -3107,6 +3108,45 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull
return getThis();
}

/**
* Sets the last delivered ID for a consumer group.
*
* @see <a href="https://valkey.io/commands/xgroup-setid/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @param id The stream entry ID that should be set as the last delivered ID for the consumer
* group.
* @return Command Response - <code>OK</code>.
*/
public T xgroupSetId(@NonNull String key, @NonNull String groupName, @NonNull String id) {
protobufTransaction.addCommands(buildCommand(XGroupSetId, buildArgs(key, groupName, id)));
return getThis();
}

/**
* Sets the last delivered ID for a consumer group.
*
* @since Redis 7.0 and above
* @see <a href="https://valkey.io/commands/xgroup-setid/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @param id The stream entry ID that should be set as the last delivered ID for the consumer
* group.
* @param entriesReadId An arbitrary ID (that isn't the first ID, last ID, or the zero ID (<code>
* "0-0"</code>)) used to find out how many entries are between the arbitrary ID (excluding
* it) and the stream's last entry.
* @return Command Response - <code>OK</code>.
*/
public T xgroupSetId(
@NonNull String key,
@NonNull String groupName,
@NonNull String id,
@NonNull String entriesReadId) {
String[] commandArgs = buildArgs(key, groupName, id, "ENTRIESREAD", entriesReadId);
protobufTransaction.addCommands(buildCommand(XGroupSetId, commandArgs));
return getThis();
}

/**
* Reads entries from the given streams owned by a consumer group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
@Builder
public final class StreamGroupOptions {

// Redis API String argument for makeStream
public static final String MAKE_STREAM_REDIS_API = "MKSTREAM";
// Valkey API String argument for makeStream
public static final String MAKE_STREAM_VALKEY_API = "MKSTREAM";

// Redis API String argument for entriesRead
public static final String ENTRIES_READ_REDIS_API = "ENTRIESREAD";
// Valkey API String argument for entriesRead
public static final String ENTRIES_READ_VALKEY_API = "ENTRIESREAD";

/**
* If <code>true</code> and the stream doesn't exist, creates a new stream with a length of <code>
Expand Down Expand Up @@ -54,11 +54,11 @@ public String[] toArgs() {
List<String> optionArgs = new ArrayList<>();

if (this.mkStream) {
optionArgs.add(MAKE_STREAM_REDIS_API);
optionArgs.add(MAKE_STREAM_VALKEY_API);
}

if (this.entriesRead != null) {
optionArgs.add(ENTRIES_READ_REDIS_API);
optionArgs.add(ENTRIES_READ_VALKEY_API);
optionArgs.add(this.entriesRead);
}

Expand Down
60 changes: 57 additions & 3 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import static glide.api.models.commands.scan.BaseScanOptions.COUNT_OPTION_STRING;
import static glide.api.models.commands.scan.BaseScanOptions.MATCH_OPTION_STRING;
import static glide.api.models.commands.stream.StreamAddOptions.NO_MAKE_STREAM_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API;
import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_VALKEY_API;
import static glide.api.models.commands.stream.StreamPendingOptions.IDLE_TIME_REDIS_API;
import static glide.api.models.commands.stream.StreamRange.EXCLUSIVE_RANGE_REDIS_API;
import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API;
Expand Down Expand Up @@ -225,6 +225,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -5785,7 +5786,9 @@ public void xgroupCreate_withOptions() {
StreamGroupOptions options =
StreamGroupOptions.builder().makeStream().entriesRead(testEntry).build();
String[] arguments =
new String[] {key, groupName, id, MAKE_STREAM_REDIS_API, ENTRIES_READ_REDIS_API, testEntry};
new String[] {
key, groupName, id, MAKE_STREAM_VALKEY_API, ENTRIES_READ_VALKEY_API, testEntry
};

CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);
Expand Down Expand Up @@ -5879,6 +5882,57 @@ public void xgroupDelConsumer() {
assertEquals(result, payload);
}

@SneakyThrows
@Test
public void xgroupSetid() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String id = "testId";
String[] arguments = new String[] {key, groupName, id};

CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(XGroupSetId), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.xgroupSetId(key, groupName, id);
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void xgroupSetidWithEntriesRead() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String id = "testId";
String entriesRead = "1-1";
String[] arguments = new String[] {key, groupName, id, "ENTRIESREAD", entriesRead};

CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(XGroupSetId), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.xgroupSetId(key, groupName, id, entriesRead);
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void xreadgroup_multiple_keys() {
Expand Down
13 changes: 10 additions & 3 deletions java/client/src/test/java/glide/api/models/TransactionTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import static glide.api.models.commands.geospatial.GeoAddOptions.CHANGED_REDIS_API;
import static glide.api.models.commands.geospatial.GeoSearchOrigin.FROMLONLAT_VALKEY_API;
import static glide.api.models.commands.geospatial.GeoSearchOrigin.FROMMEMBER_VALKEY_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API;
import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_VALKEY_API;
import static glide.api.models.commands.stream.StreamPendingOptions.IDLE_TIME_REDIS_API;
import static glide.api.models.commands.stream.StreamRange.EXCLUSIVE_RANGE_REDIS_API;
import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API;
Expand Down Expand Up @@ -188,6 +188,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -817,7 +818,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
Pair.of(
XGroupCreate,
buildArgs(
"key", "group", "id", MAKE_STREAM_REDIS_API, ENTRIES_READ_REDIS_API, "entry")));
"key", "group", "id", MAKE_STREAM_VALKEY_API, ENTRIES_READ_VALKEY_API, "entry")));

transaction.xgroupDestroy("key", "group");
results.add(Pair.of(XGroupDestroy, buildArgs("key", "group")));
Expand All @@ -835,6 +836,12 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
buildArgs(
READ_GROUP_REDIS_API, "group", "consumer", READ_STREAMS_REDIS_API, "key", "id")));

transaction.xgroupSetId("key", "group", "id");
results.add(Pair.of(XGroupSetId, buildArgs("key", "group", "id")));

transaction.xgroupSetId("key", "group", "id", "1-1");
results.add(Pair.of(XGroupSetId, buildArgs("key", "group", "id", "ENTRIESREAD", "1-1")));

transaction.xreadgroup(
Map.of("key", "id"),
"group",
Expand Down
Loading

0 comments on commit 9271f2c

Please sign in to comment.