Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add XGROUP SETID command #1720

Merged
merged 6 commits into from
Jun 30, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -1973,18 +1974,18 @@ public CompletableFuture<Map<String, String[][]>> xrevrange(

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull String key, @NonNull String groupname, @NonNull String id) {
@NonNull String key, @NonNull String groupName, @NonNull String id) {
return commandManager.submitNewCommand(
XGroupCreate, new String[] {key, groupname, id}, this::handleStringResponse);
XGroupCreate, new String[] {key, groupName, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupCreate(
@NonNull String key,
@NonNull String groupname,
@NonNull String groupName,
@NonNull String id,
@NonNull StreamGroupOptions options) {
String[] arguments = concatenateArrays(new String[] {key, groupname, id}, options.toArgs());
String[] arguments = concatenateArrays(new String[] {key, groupName, id}, options.toArgs());
return commandManager.submitNewCommand(XGroupCreate, arguments, this::handleStringResponse);
}

Expand All @@ -2008,6 +2009,23 @@ public CompletableFuture<Long> xgroupDelConsumer(
XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse);
}

@Override
public CompletableFuture<String> xgroupSetId(
@NonNull String key, @NonNull String groupName, @NonNull String id) {
return commandManager.submitNewCommand(
XGroupSetId, new String[] {key, groupName, id}, this::handleStringResponse);
}

@Override
public CompletableFuture<String> xgroupSetId(
@NonNull String key,
@NonNull String groupName,
@NonNull String id,
@NonNull String entriesReadId) {
String[] arguments = new String[] {key, groupName, id, "ENTRIESREAD", entriesReadId};
return commandManager.submitNewCommand(XGroupSetId, arguments, this::handleStringResponse);
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
@NonNull Map<String, String> keysAndIds, @NonNull String group, @NonNull String consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ CompletableFuture<Map<String, String[][]>> xrevrange(
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param groupName The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
Expand All @@ -391,7 +391,7 @@ CompletableFuture<Map<String, String[][]>> xrevrange(
* }</pre>
*/
CompletableFuture<String> xgroupCreate(
String key, String groupname, String id, StreamGroupOptions options);
String key, String groupName, String id, StreamGroupOptions options);

/**
* Destroys the consumer group <code>groupname</code> for the stream stored at <code>key</code>.
Expand Down Expand Up @@ -443,6 +443,45 @@ CompletableFuture<String> xgroupCreate(
*/
CompletableFuture<Long> xgroupDelConsumer(String key, String group, String consumer);

/**
* Sets the last delivered ID for a consumer group.
*
* @see <a href="https://valkey.io/commands/xgroup-setid/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @param id The stream entry ID that should be set as the last delivered ID for the consumer
* group.
* @return <code>OK</code>.
* @example
* <pre>{@code
* // Update consumer group "mygroup", to set the last delivered entry ID.
* assert client.xgroupSetId("mystream", "mygroup", "0").get().equals("OK");
* }</pre>
*/
CompletableFuture<String> xgroupSetId(String key, String groupName, String id);

/**
* Sets the last delivered ID for a consumer group.
*
* @since Redis 7.0 and above
* @see <a href="https://valkey.io/commands/xgroup-setid/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @param id The stream entry ID that should be set as the last delivered ID for the consumer
* group.
* @param entriesReadId An arbitrary ID (that isn't the first ID, last ID, or the zero ID (<code>
* "0-0"</code>)) used to find out how many entries are between the arbitrary ID (excluding
* it) and the stream's last entry.
* @return <code>OK</code>.
* @example
* <pre>{@code
* // Update consumer group "mygroup", to set the last delivered entry ID.
* assert client.xgroupSetId("mystream", "mygroup", "0", "1-1").get().equals("OK");
* }</pre>
*/
CompletableFuture<String> xgroupSetId(
String key, String groupName, String id, String entriesReadId);

/**
* Reads entries from the given streams owned by a consumer group.
*
Expand Down
52 changes: 46 additions & 6 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -3024,14 +3025,14 @@ public T xrevrange(
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param groupName The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
* @return Command Response - <code>OK</code>.
*/
public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull String id) {
protobufTransaction.addCommands(buildCommand(XGroupCreate, buildArgs(key, groupname, id)));
public T xgroupCreate(@NonNull String key, @NonNull String groupName, @NonNull String id) {
protobufTransaction.addCommands(buildCommand(XGroupCreate, buildArgs(key, groupName, id)));
return getThis();
}

Expand All @@ -3041,7 +3042,7 @@ public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull S
*
* @see <a href="https://valkey.io/commands/xgroup-create/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupname The newly created consumer group name.
* @param groupName The newly created consumer group name.
* @param id Stream entry ID that specifies the last delivered entry in the stream from the new
* group’s perspective. The special ID <code>"$"</code> can be used to specify the last entry
* in the stream.
Expand All @@ -3050,11 +3051,11 @@ public T xgroupCreate(@NonNull String key, @NonNull String groupname, @NonNull S
*/
public T xgroupCreate(
@NonNull String key,
@NonNull String groupname,
@NonNull String groupName,
@NonNull String id,
@NonNull StreamGroupOptions options) {
String[] commandArgs =
buildArgs(concatenateArrays(new String[] {key, groupname, id}, options.toArgs()));
buildArgs(concatenateArrays(new String[] {key, groupName, id}, options.toArgs()));
protobufTransaction.addCommands(buildCommand(XGroupCreate, commandArgs));
return getThis();
}
Expand Down Expand Up @@ -3107,6 +3108,45 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull
return getThis();
}

/**
* Sets the last delivered ID for a consumer group.
*
* @see <a href="https://valkey.io/commands/xgroup-setid/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @param id The stream entry ID that should be set as the last delivered ID for the consumer
* group.
* @return Command Response - <code>OK</code>.
*/
public T xgroupSetId(@NonNull String key, @NonNull String groupName, @NonNull String id) {
protobufTransaction.addCommands(buildCommand(XGroupSetId, buildArgs(key, groupName, id)));
return getThis();
}

/**
* Sets the last delivered ID for a consumer group.
*
* @since Redis 7.0 and above
* @see <a href="https://valkey.io/commands/xgroup-setid/">valkey.io</a> for details.
* @param key The key of the stream.
* @param groupName The consumer group name.
* @param id The stream entry ID that should be set as the last delivered ID for the consumer
* group.
* @param entriesReadId An arbitrary ID (that isn't the first ID, last ID, or the zero ID (<code>
* "0-0"</code>)) used to find out how many entries are between the arbitrary ID (excluding
* it) and the stream's last entry.
* @return Command Response - <code>OK</code>.
*/
public T xgroupSetId(
@NonNull String key,
@NonNull String groupName,
@NonNull String id,
@NonNull String entriesReadId) {
String[] commandArgs = buildArgs(key, groupName, id, "ENTRIESREAD", entriesReadId);
protobufTransaction.addCommands(buildCommand(XGroupSetId, commandArgs));
return getThis();
}

/**
* Reads entries from the given streams owned by a consumer group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
@Builder
public final class StreamGroupOptions {

// Redis API String argument for makeStream
public static final String MAKE_STREAM_REDIS_API = "MKSTREAM";
// Valkey API String argument for makeStream
jduo marked this conversation as resolved.
Show resolved Hide resolved
public static final String MAKE_STREAM_VALKEY_API = "MKSTREAM";

// Redis API String argument for entriesRead
public static final String ENTRIES_READ_REDIS_API = "ENTRIESREAD";
// Valkey API String argument for entriesRead
public static final String ENTRIES_READ_VALKEY_API = "ENTRIESREAD";

/**
* If <code>true</code> and the stream doesn't exist, creates a new stream with a length of <code>
Expand Down Expand Up @@ -54,11 +54,11 @@ public String[] toArgs() {
List<String> optionArgs = new ArrayList<>();

if (this.mkStream) {
optionArgs.add(MAKE_STREAM_REDIS_API);
optionArgs.add(MAKE_STREAM_VALKEY_API);
}

if (this.entriesRead != null) {
optionArgs.add(ENTRIES_READ_REDIS_API);
optionArgs.add(ENTRIES_READ_VALKEY_API);
optionArgs.add(this.entriesRead);
}

Expand Down
60 changes: 57 additions & 3 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import static glide.api.models.commands.scan.BaseScanOptions.COUNT_OPTION_STRING;
import static glide.api.models.commands.scan.BaseScanOptions.MATCH_OPTION_STRING;
import static glide.api.models.commands.stream.StreamAddOptions.NO_MAKE_STREAM_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API;
import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_VALKEY_API;
import static glide.api.models.commands.stream.StreamPendingOptions.IDLE_TIME_REDIS_API;
import static glide.api.models.commands.stream.StreamRange.EXCLUSIVE_RANGE_REDIS_API;
import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API;
Expand Down Expand Up @@ -225,6 +225,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -5785,7 +5786,9 @@ public void xgroupCreate_withOptions() {
StreamGroupOptions options =
StreamGroupOptions.builder().makeStream().entriesRead(testEntry).build();
String[] arguments =
new String[] {key, groupName, id, MAKE_STREAM_REDIS_API, ENTRIES_READ_REDIS_API, testEntry};
new String[] {
key, groupName, id, MAKE_STREAM_VALKEY_API, ENTRIES_READ_VALKEY_API, testEntry
};

CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);
Expand Down Expand Up @@ -5879,6 +5882,57 @@ public void xgroupDelConsumer() {
assertEquals(result, payload);
}

@SneakyThrows
@Test
public void xgroupSetid() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String id = "testId";
String[] arguments = new String[] {key, groupName, id};

CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(XGroupSetId), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.xgroupSetId(key, groupName, id);
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void xgroupSetidWithEntriesRead() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String id = "testId";
String entriesRead = "1-1";
String[] arguments = new String[] {key, groupName, id, "ENTRIESREAD", entriesRead};

CompletableFuture<String> testResponse = new CompletableFuture<>();
testResponse.complete(OK);

// match on protobuf request
when(commandManager.<String>submitNewCommand(eq(XGroupSetId), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<String> response = service.xgroupSetId(key, groupName, id, entriesRead);
String payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(OK, payload);
}

@SneakyThrows
@Test
public void xreadgroup_multiple_keys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import static glide.api.models.commands.geospatial.GeoAddOptions.CHANGED_REDIS_API;
import static glide.api.models.commands.geospatial.GeoSearchOrigin.FROMLONLAT_VALKEY_API;
import static glide.api.models.commands.geospatial.GeoSearchOrigin.FROMMEMBER_VALKEY_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_REDIS_API;
import static glide.api.models.commands.stream.StreamGroupOptions.ENTRIES_READ_VALKEY_API;
import static glide.api.models.commands.stream.StreamGroupOptions.MAKE_STREAM_VALKEY_API;
import static glide.api.models.commands.stream.StreamPendingOptions.IDLE_TIME_REDIS_API;
import static glide.api.models.commands.stream.StreamRange.EXCLUSIVE_RANGE_REDIS_API;
import static glide.api.models.commands.stream.StreamRange.MAXIMUM_RANGE_REDIS_API;
Expand Down Expand Up @@ -188,6 +188,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreateConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDelConsumer;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupDestroy;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupSetId;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XPending;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
Expand Down Expand Up @@ -817,7 +818,7 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
Pair.of(
XGroupCreate,
buildArgs(
"key", "group", "id", MAKE_STREAM_REDIS_API, ENTRIES_READ_REDIS_API, "entry")));
"key", "group", "id", MAKE_STREAM_VALKEY_API, ENTRIES_READ_VALKEY_API, "entry")));

transaction.xgroupDestroy("key", "group");
results.add(Pair.of(XGroupDestroy, buildArgs("key", "group")));
Expand All @@ -835,6 +836,12 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
buildArgs(
READ_GROUP_REDIS_API, "group", "consumer", READ_STREAMS_REDIS_API, "key", "id")));

transaction.xgroupSetId("key", "group", "id");
results.add(Pair.of(XGroupSetId, buildArgs("key", "group", "id")));

transaction.xgroupSetId("key", "group", "id", "1-1");
results.add(Pair.of(XGroupSetId, buildArgs("key", "group", "id", "ENTRIESREAD", "1-1")));

transaction.xreadgroup(
Map.of("key", "id"),
"group",
Expand Down
Loading
Loading