Skip to content

Commit

Permalink
Node: add command XACK (valkey-io#2112)
Browse files Browse the repository at this point in the history
Signed-off-by: TJ Zhang <tj.zhang@improving.com>
Co-authored-by: TJ Zhang <tj.zhang@improving.com>
  • Loading branch information
tjzhang-BQ and TJ Zhang authored Aug 20, 2024
1 parent 866d47a commit c397ce7
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
* Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088))
* Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107))
* Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146))
* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
31 changes: 31 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ import {
createUnlink,
createWait,
createWatch,
createXAck,
createXAdd,
createXAutoClaim,
createXClaim,
Expand Down Expand Up @@ -5183,6 +5184,36 @@ export class BaseClient {
preferReplica: connection_request.ReadFrom.PreferReplica,
};

/**
* Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
* This command should be called on a pending message so that such message does not get processed again.
*
* @see {@link https://valkey.io/commands/xack/|valkey.io} for details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param ids - An array of entry ids.
* @returns The number of messages that were successfully acknowledged.
*
* @example
* ```typescript
* <pre>{@code
* const entryId = await client.xadd("mystream", ["myfield", "mydata"]);
* // read messages from streamId
* const readResult = await client.xreadgroup(["myfield", "mydata"], "mygroup", "my0consumer");
* // acknowledge messages on stream
* console.log(await client.xack("mystream", "mygroup", [entryId])); // Output: 1L
* </pre>
* ```
*/
public async xack(
key: string,
group: string,
ids: string[],
): Promise<number> {
return this.createWritePromise(createXAck(key, group, ids));
}

/** Returns the element at index `index` in the list stored at `key`.
* The index is zero-based, so 0 means the first element, 1 the second element and so on.
* Negative indices can be used to designate elements starting at the tail of the list.
Expand Down
11 changes: 11 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3942,3 +3942,14 @@ export function createGetEx(

return createCommand(RequestType.GetEx, args);
}

/**
* @internal
*/
export function createXAck(
key: string,
group: string,
ids: string[],
): command_request.Command {
return createCommand(RequestType.XAck, [key, group, ...ids]);
}
17 changes: 17 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ import {
createType,
createUnlink,
createWait,
createXAck,
createXAdd,
createXAutoClaim,
createXClaim,
Expand Down Expand Up @@ -2892,6 +2893,22 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
);
}

/**
* Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
* This command should be called on a pending message so that such message does not get processed again.
*
* @see {@link https://valkey.io/commands/xack/|valkey.io} for details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param ids - An array of entry ids.
*
* Command Response - The number of messages that were successfully acknowledged.
*/
public xack(key: string, group: string, ids: string[]): T {
return this.addAndReturn(createXAck(key, group, ids));
}

/**
* Renames `key` to `newkey`.
* If `newkey` already exists it is overwritten.
Expand Down
113 changes: 113 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9834,6 +9834,119 @@ export function runBaseTests(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xack test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
const key = "{testKey}:1-" + uuidv4();
const nonExistingKey = "{testKey}:2-" + uuidv4();
const string_key = "{testKey}:3-" + uuidv4();
const groupName = uuidv4();
const consumerName = uuidv4();
const stream_id0 = "0";
const stream_id1_0 = "1-0";
const stream_id1_1 = "1-1";
const stream_id1_2 = "1-2";

// setup: add 2 entries to the stream, create consumer group and read to mark them as pending
expect(
await client.xadd(key, [["f0", "v0"]], {
id: stream_id1_0,
}),
).toEqual(stream_id1_0);
expect(
await client.xadd(key, [["f1", "v1"]], {
id: stream_id1_1,
}),
).toEqual(stream_id1_1);
expect(
await client.xgroupCreate(key, groupName, stream_id0),
).toBe("OK");
expect(
await client.xreadgroup(groupName, consumerName, {
[key]: ">",
}),
).toEqual({
[key]: {
[stream_id1_0]: [["f0", "v0"]],
[stream_id1_1]: [["f1", "v1"]],
},
});

// add one more entry
expect(
await client.xadd(key, [["f2", "v2"]], {
id: stream_id1_2,
}),
).toEqual(stream_id1_2);

// acknowledge the first 2 entries
expect(
await client.xack(key, groupName, [
stream_id1_0,
stream_id1_1,
]),
).toBe(2);

// attempt to acknowledge the first 2 entries again, returns 0 since they were already acknowledged
expect(
await client.xack(key, groupName, [
stream_id1_0,
stream_id1_1,
]),
).toBe(0);

// read the last unacknowledged entry
expect(
await client.xreadgroup(groupName, consumerName, {
[key]: ">",
}),
).toEqual({ [key]: { [stream_id1_2]: [["f2", "v2"]] } });

// deleting the consumer, returns 1 since the last entry still hasn't been acknowledged
expect(
await client.xgroupDelConsumer(
key,
groupName,
consumerName,
),
).toBe(1);

// attempt to acknowledge a non-existing key, returns 0
expect(
await client.xack(nonExistingKey, groupName, [
stream_id1_0,
]),
).toBe(0);

// attempt to acknowledge a non-existing group name, returns 0
expect(
await client.xack(key, "nonExistingGroup", [stream_id1_0]),
).toBe(0);

// attempt to acknowledge a non-existing ID, returns 0
expect(await client.xack(key, groupName, ["99-99"])).toBe(0);

// invalid argument - ID list must not be empty
await expect(client.xack(key, groupName, [])).rejects.toThrow(
RequestError,
);

// invalid argument - invalid stream ID format
await expect(
client.xack(key, groupName, ["invalid stream ID format"]),
).rejects.toThrow(RequestError);

// key exists, but is not a stream
expect(await client.set(string_key, "xack")).toBe("OK");
await expect(
client.xack(string_key, groupName, [stream_id1_0]),
).rejects.toThrow(RequestError);
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`lmpop test_%p`,
async (protocol) => {
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,8 @@ export async function transactionTest(
]);
}

baseTransaction.xack(key9, groupName1, ["0-3"]);
responseData.push(["xack(key9, groupName1, ['0-3'])", 0]);
baseTransaction.xgroupDelConsumer(key9, groupName1, consumer);
responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 1]);
baseTransaction.xgroupDestroy(key9, groupName1);
Expand Down

0 comments on commit c397ce7

Please sign in to comment.