Skip to content

Commit

Permalink
Node: Add XINFO GROUPS command. (#2122)
Browse files Browse the repository at this point in the history
Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
Co-authored-by: Andrew Carbonetto <andrew.carbonetto@improving.com>
  • Loading branch information
Yury-Fridlyand and acarbonetto authored Aug 16, 2024
1 parent fc5a64b commit 1195c82
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added XINFO GROUPS command ([#2122](https://github.com/valkey-io/valkey-glide/pull/2122))
* Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105))
* Java: Added binary support for custom command ([#2109](https://github.com/valkey-io/valkey-glide/pull/2109))
* Node: Added SSCAN command ([#2132](https://github.com/valkey-io/valkey-glide/pull/2132))
Expand Down
42 changes: 41 additions & 1 deletion node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import {
GeoUnit,
GeospatialData,
InsertPosition,
KeyWeight, // eslint-disable-line @typescript-eslint/no-unused-vars
KeyWeight,
LPosOptions,
ListDirection,
MemberOrigin, // eslint-disable-line @typescript-eslint/no-unused-vars
Expand Down Expand Up @@ -176,6 +176,7 @@ import {
createXGroupDelConsumer,
createXGroupDestroy,
createXInfoConsumers,
createXInfoGroups,
createXInfoStream,
createXLen,
createXPending,
Expand Down Expand Up @@ -4405,6 +4406,45 @@ export class BaseClient {
return this.createWritePromise(createXInfoConsumers(key, group));
}

/**
* Returns the list of all consumer groups and their attributes for the stream stored at `key`.
*
* @see {@link https://valkey.io/commands/xinfo-groups/|valkey.io} for details.
*
* @param key - The key of the stream.
* @returns An array of maps, where each mapping represents the
* attributes of a consumer group for the stream at `key`.
* @example
* ```typescript
* <pre>{@code
* const result = await client.xinfoGroups("my_stream");
* console.log(result); // Output:
* // [
* // {
* // "name": "mygroup",
* // "consumers": 2,
* // "pending": 2,
* // "last-delivered-id": "1638126030001-0",
* // "entries-read": 2, // Added in version 7.0.0
* // "lag": 0 // Added in version 7.0.0
* // },
* // {
* // "name": "some-other-group",
* // "consumers": 1,
* // "pending": 0,
* // "last-delivered-id": "0-0",
* // "entries-read": null, // Added in version 7.0.0
* // "lag": 1 // Added in version 7.0.0
* // }
* // ]
* ```
*/
public async xinfoGroups(
key: string,
): Promise<Record<string, string | number | null>[]> {
return this.createWritePromise(createXInfoGroups(key));
}

/**
* Changes the ownership of a pending message.
*
Expand Down
5 changes: 5 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2453,6 +2453,11 @@ export function createXInfoStream(
return createCommand(RequestType.XInfoStream, args);
}

/** @internal */
export function createXInfoGroups(key: string): command_request.Command {
return createCommand(RequestType.XInfoGroups, [key]);
}

/**
* @internal
*/
Expand Down
15 changes: 15 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ import {
createXGroupDelConsumer,
createXGroupDestroy,
createXInfoConsumers,
createXInfoGroups,
createXInfoStream,
createXLen,
createXPending,
Expand Down Expand Up @@ -2331,6 +2332,20 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXInfoStream(key, fullOptions ?? false));
}

/**
* Returns the list of all consumer groups and their attributes for the stream stored at `key`.
*
* @see {@link https://valkey.io/commands/xinfo-groups/|valkey.io} for details.
*
* @param key - The key of the stream.
*
* Command Response - An `Array` of `Records`, where each mapping represents the
* attributes of a consumer group for the stream at `key`.
*/
public xinfoGroups(key: string): T {
return this.addAndReturn(createXInfoGroups(key));
}

/** Returns the server time.
* @see {@link https://valkey.io/commands/time/|valkey.io} for details.
*
Expand Down
192 changes: 191 additions & 1 deletion node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5570,7 +5570,6 @@ export function runBaseTests<Context>(config: {
"last-entry": (string | number | string[])[];
groups: number;
};
console.log(result);

// verify result:
expect(result.length).toEqual(1);
Expand Down Expand Up @@ -8232,6 +8231,197 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xinfogroups xinfo groups %p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster) => {
const key = uuidv4();
const stringKey = uuidv4();
const groupName1 = uuidv4();
const consumer1 = uuidv4();
const streamId1 = "0-1";
const streamId2 = "0-2";
const streamId3 = "0-3";

expect(
await client.xgroupCreate(key, groupName1, "0-0", {
mkStream: true,
}),
).toEqual("OK");

// one empty group exists
expect(await client.xinfoGroups(key)).toEqual(
cluster.checkIfServerVersionLessThan("7.0.0")
? [
{
name: groupName1,
consumers: 0,
pending: 0,
"last-delivered-id": "0-0",
},
]
: [
{
name: groupName1,
consumers: 0,
pending: 0,
"last-delivered-id": "0-0",
"entries-read": null,
lag: 0,
},
],
);

expect(
await client.xadd(
key,
[
["entry1_field1", "entry1_value1"],
["entry1_field2", "entry1_value2"],
],
{ id: streamId1 },
),
).toEqual(streamId1);

expect(
await client.xadd(
key,
[
["entry2_field1", "entry2_value1"],
["entry2_field2", "entry2_value2"],
],
{ id: streamId2 },
),
).toEqual(streamId2);

expect(
await client.xadd(
key,
[["entry3_field1", "entry3_value1"]],
{ id: streamId3 },
),
).toEqual(streamId3);

// same as previous check, bug lag = 3, there are 3 messages unread
expect(await client.xinfoGroups(key)).toEqual(
cluster.checkIfServerVersionLessThan("7.0.0")
? [
{
name: groupName1,
consumers: 0,
pending: 0,
"last-delivered-id": "0-0",
},
]
: [
{
name: groupName1,
consumers: 0,
pending: 0,
"last-delivered-id": "0-0",
"entries-read": null,
lag: 3,
},
],
);

expect(
await client.customCommand([
"XREADGROUP",
"GROUP",
groupName1,
consumer1,
"STREAMS",
key,
">",
]),
).toEqual({
[key]: {
[streamId1]: [
["entry1_field1", "entry1_value1"],
["entry1_field2", "entry1_value2"],
],
[streamId2]: [
["entry2_field1", "entry2_value1"],
["entry2_field2", "entry2_value2"],
],
[streamId3]: [["entry3_field1", "entry3_value1"]],
},
});
// after reading, `lag` is reset, and `pending`, consumer count and last ID are set
expect(await client.xinfoGroups(key)).toEqual(
cluster.checkIfServerVersionLessThan("7.0.0")
? [
{
name: groupName1,
consumers: 1,
pending: 3,
"last-delivered-id": streamId3,
},
]
: [
{
name: groupName1,
consumers: 1,
pending: 3,
"last-delivered-id": streamId3,
"entries-read": 3,
lag: 0,
},
],
);

expect(
await client.customCommand([
"XACK",
key,
groupName1,
streamId1,
]),
).toEqual(1);
// once message ack'ed, pending counter decreased
expect(await client.xinfoGroups(key)).toEqual(
cluster.checkIfServerVersionLessThan("7.0.0")
? [
{
name: groupName1,
consumers: 1,
pending: 2,
"last-delivered-id": streamId3,
},
]
: [
{
name: groupName1,
consumers: 1,
pending: 2,
"last-delivered-id": streamId3,
"entries-read": 3,
lag: 0,
},
],
);

// key exists, but it is not a stream
expect(await client.set(stringKey, "foo")).toEqual("OK");
await expect(client.xinfoGroups(stringKey)).rejects.toThrow(
RequestError,
);

// Passing a non-existing key raises an error
const key2 = uuidv4();
await expect(client.xinfoGroups(key2)).rejects.toThrow(
RequestError,
);
// create a second stream
await client.xadd(key2, [["a", "b"]]);
// no group yet exists
expect(await client.xinfoGroups(key2)).toEqual([]);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xpending test_%p`,
async (protocol) => {
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,8 @@ export async function transactionTest(
'xtrim(key9, { method: "minid", threshold: "0-2", exact: true }',
1,
]);
baseTransaction.xinfoGroups(key9);
responseData.push(["xinfoGroups(key9)", []]);
baseTransaction.xgroupCreate(key9, groupName1, "0-0");
responseData.push(['xgroupCreate(key9, groupName1, "0-0")', "OK"]);
baseTransaction.xgroupCreate(key9, groupName2, "0-0", { mkStream: true });
Expand Down

0 comments on commit 1195c82

Please sign in to comment.