Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Add XINFO GROUPS command. #2122

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added XINFO GROUPS command ([#2122](https://github.com/valkey-io/valkey-glide/pull/2122))
* Node: Added XAUTOCLAIM command ([#2108](https://github.com/valkey-io/valkey-glide/pull/2108))
* Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085))
* Node: Added XINFO CONSUMERS command ([#2093](https://github.com/valkey-io/valkey-glide/pull/2093))
Expand Down
50 changes: 43 additions & 7 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,18 @@ import {
BitOffsetMultiplier, // eslint-disable-line @typescript-eslint/no-unused-vars
BitOffsetOptions,
BitmapIndexType,
BitwiseOperation,
CoordOrigin, // eslint-disable-line @typescript-eslint/no-unused-vars
BitwiseOperation, // eslint-disable-line @typescript-eslint/no-unused-vars
ExpireOptions,
GeoAddOptions,
GeoBoxShape, // eslint-disable-line @typescript-eslint/no-unused-vars
GeoCircleShape, // eslint-disable-line @typescript-eslint/no-unused-vars
GeoSearchResultOptions,
GeoSearchShape,
GeoSearchStoreResultOptions,
GeoUnit,
GeospatialData,
InsertPosition,
KeyWeight, // eslint-disable-line @typescript-eslint/no-unused-vars
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirm that this should be removed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to suppress this eslint check if we import a type to refer in docs. KeyWeight is used as a function argument, so we can remove it.

KeyWeight,
LPosOptions,
ListDirection,
MemberOrigin, // eslint-disable-line @typescript-eslint/no-unused-vars
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
ListDirection, // eslint-disable-line @typescript-eslint/no-unused-vars
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
RangeByIndex,
RangeByLex,
RangeByScore,
Expand Down Expand Up @@ -170,6 +166,7 @@ import {
createXGroupDelConsumer,
createXGroupDestroy,
createXInfoConsumers,
createXInfoGroups,
createXInfoStream,
createXLen,
createXPending,
Expand Down Expand Up @@ -4118,6 +4115,45 @@ export class BaseClient {
return this.createWritePromise(createXInfoConsumers(key, group));
}

/**
* Returns the list of all consumer groups and their attributes for the stream stored at `key`.
*
* See https://valkey.io/commands/xinfo-groups/ for more details.
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
*
* @param key - The key of the stream.
* @returns An `Array` of `Records`, where each mapping represents the
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
* attributes of a consumer group for the stream at `key`.
* @example
* ```typescript
* <pre>{@code
* const result = await client.xinfoGroups("my_stream");
* console.log(result); // Output:
* // [
* // {
* // "name": "mygroup",
* // "consumers": 2,
* // "pending": 2,
* // "last-delivered-id": "1638126030001-0",
* // "entries-read": 2, // Added in version 7.0.0
* // "lag": 0 // Added in version 7.0.0
* // },
* // {
* // "name": "some-other-group",
* // "consumers": 1,
* // "pending": 0,
* // "last-delivered-id": "0-0",
* // "entries-read": null, // Added in version 7.0.0
* // "lag": 1 // Added in version 7.0.0
* // }
* // ]
* ```
*/
public async xinfoGroups(
key: string,
): Promise<Record<string, string | number | null>[]> {
return this.createWritePromise(createXInfoGroups(key));
}

/**
* Changes the ownership of a pending message.
*
Expand Down
5 changes: 5 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2431,6 +2431,11 @@ export function createXInfoStream(
return createCommand(RequestType.XInfoStream, args);
}

/** @internal */
export function createXInfoGroups(key: string): command_request.Command {
return createCommand(RequestType.XInfoGroups, [key]);
}

/**
* @internal
*/
Expand Down
15 changes: 15 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ import {
createXGroupDelConsumer,
createXGroupDestroy,
createXInfoConsumers,
createXInfoGroups,
createXInfoStream,
createXLen,
createXPending,
Expand Down Expand Up @@ -2275,6 +2276,20 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXInfoStream(key, fullOptions ?? false));
}

/**
* Returns the list of all consumer groups and their attributes for the stream stored at `key`.
*
* See https://valkey.io/commands/xinfo-groups/ for more details.
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
*
* @param key - The key of the stream.
*
* Command Response - An `Array` of `Records`, where each mapping represents the
* attributes of a consumer group for the stream at `key`.
*/
public xinfoGroups(key: string): T {
return this.addAndReturn(createXInfoGroups(key));
}

/** Returns the server time.
* See https://valkey.io/commands/time/ for details.
*
Expand Down
192 changes: 191 additions & 1 deletion node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5076,7 +5076,6 @@ export function runBaseTests<Context>(config: {
"last-entry": (string | number | string[])[];
groups: number;
};
console.log(result);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


// verify result:
expect(result.length).toEqual(1);
Expand Down Expand Up @@ -7706,6 +7705,197 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xinfogroups xinfo groups %p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster) => {
const key = uuidv4();
const stringKey = uuidv4();
const groupName1 = uuidv4();
const consumer1 = uuidv4();
const streamId1 = "0-1";
const streamId2 = "0-2";
const streamId3 = "0-3";

expect(
await client.xgroupCreate(key, groupName1, "0-0", {
mkStream: true,
}),
).toEqual("OK");

// one empty group exists
expect(await client.xinfoGroups(key)).toEqual(
cluster.checkIfServerVersionLessThan("7.0.0")
? [
{
name: groupName1,
consumers: 0,
pending: 0,
"last-delivered-id": "0-0",
},
]
: [
{
name: groupName1,
consumers: 0,
pending: 0,
"last-delivered-id": "0-0",
"entries-read": null,
lag: 0,
},
],
);

expect(
await client.xadd(
key,
[
["entry1_field1", "entry1_value1"],
["entry1_field2", "entry1_value2"],
],
{ id: streamId1 },
),
).toEqual(streamId1);

expect(
await client.xadd(
key,
[
["entry2_field1", "entry2_value1"],
["entry2_field2", "entry2_value2"],
],
{ id: streamId2 },
),
).toEqual(streamId2);

expect(
await client.xadd(
key,
[["entry3_field1", "entry3_value1"]],
{ id: streamId3 },
),
).toEqual(streamId3);

// same as previous check, bug lag = 3, there are 3 messages unread
expect(await client.xinfoGroups(key)).toEqual(
cluster.checkIfServerVersionLessThan("7.0.0")
? [
{
name: groupName1,
consumers: 0,
pending: 0,
"last-delivered-id": "0-0",
},
]
: [
{
name: groupName1,
consumers: 0,
pending: 0,
"last-delivered-id": "0-0",
"entries-read": null,
lag: 3,
},
],
);

expect(
await client.customCommand([
"XREADGROUP",
"GROUP",
groupName1,
consumer1,
"STREAMS",
key,
">",
]),
).toEqual({
[key]: {
[streamId1]: [
["entry1_field1", "entry1_value1"],
["entry1_field2", "entry1_value2"],
],
[streamId2]: [
["entry2_field1", "entry2_value1"],
["entry2_field2", "entry2_value2"],
],
[streamId3]: [["entry3_field1", "entry3_value1"]],
},
});
// after reading, `lag` is reset, and `pending`, consumer count and last ID are set
expect(await client.xinfoGroups(key)).toEqual(
cluster.checkIfServerVersionLessThan("7.0.0")
? [
{
name: groupName1,
consumers: 1,
pending: 3,
"last-delivered-id": streamId3,
},
]
: [
{
name: groupName1,
consumers: 1,
pending: 3,
"last-delivered-id": streamId3,
"entries-read": 3,
lag: 0,
},
],
);

expect(
await client.customCommand([
"XACK",
key,
groupName1,
streamId1,
]),
).toEqual(1);
// once message ack'ed, pending counter decreased
expect(await client.xinfoGroups(key)).toEqual(
cluster.checkIfServerVersionLessThan("7.0.0")
? [
{
name: groupName1,
consumers: 1,
pending: 2,
"last-delivered-id": streamId3,
},
]
: [
{
name: groupName1,
consumers: 1,
pending: 2,
"last-delivered-id": streamId3,
"entries-read": 3,
lag: 0,
},
],
);

// key exists, but it is not a stream
expect(await client.set(stringKey, "foo")).toEqual("OK");
await expect(client.xinfoGroups(stringKey)).rejects.toThrow(
RequestError,
);

// Passing a non-existing key raises an error
const key2 = uuidv4();
await expect(client.xinfoGroups(key2)).rejects.toThrow(
RequestError,
);
// create a second stream
await client.xadd(key2, [["a", "b"]]);
// no group yet exists
expect(await client.xinfoGroups(key2)).toEqual([]);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xpending test_%p`,
async (protocol) => {
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,8 @@ export async function transactionTest(
'xtrim(key9, { method: "minid", threshold: "0-2", exact: true }',
1,
]);
baseTransaction.xinfoGroups(key9);
responseData.push(["xinfoGroups(key9)", []]);
baseTransaction.xgroupCreate(key9, groupName1, "0-0");
responseData.push(['xgroupCreate(key9, groupName1, "0-0")', "OK"]);
baseTransaction.xgroupCreate(key9, groupName2, "0-0", { mkStream: true });
Expand Down
Loading