diff --git a/CHANGELOG.md b/CHANGELOG.md
index af4e2672e4..0d4d56abe7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -88,6 +88,7 @@
* Node: Added XGROUP CREATECONSUMER & XGROUP DELCONSUMER commands ([#2088](https://github.com/valkey-io/valkey-glide/pull/2088))
* Node: Added GETEX command ([#2107]((https://github.com/valkey-io/valkey-glide/pull/2107))
* Node: Added ZINTER and ZUNION commands ([#2146](https://github.com/aws/glide-for-redis/pull/2146))
+* Node: Added XACK commands ([#2112](https://github.com/valkey-io/valkey-glide/pull/2112))
#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts
index 68156ef365..1c4b24c87e 100644
--- a/node/src/BaseClient.ts
+++ b/node/src/BaseClient.ts
@@ -171,6 +171,7 @@ import {
createUnlink,
createWait,
createWatch,
+ createXAck,
createXAdd,
createXAutoClaim,
createXClaim,
@@ -5183,6 +5184,36 @@ export class BaseClient {
preferReplica: connection_request.ReadFrom.PreferReplica,
};
+ /**
+ * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
+ * This command should be called on a pending message so that such message does not get processed again.
+ *
+ * @see {@link https://valkey.io/commands/xack/|valkey.io} for details.
+ *
+ * @param key - The key of the stream.
+ * @param group - The consumer group name.
+ * @param ids - An array of entry ids.
+ * @returns The number of messages that were successfully acknowledged.
+ *
+ * @example
+ * ```typescript
+ *
{@code
+ * const entryId = await client.xadd("mystream", ["myfield", "mydata"]);
+ * // read messages from streamId
+ * const readResult = await client.xreadgroup(["myfield", "mydata"], "mygroup", "my0consumer");
+ * // acknowledge messages on stream
+ * console.log(await client.xack("mystream", "mygroup", [entryId])); // Output: 1L
+ *
+ * ```
+ */
+ public async xack(
+ key: string,
+ group: string,
+ ids: string[],
+ ): Promise {
+ return this.createWritePromise(createXAck(key, group, ids));
+ }
+
/** Returns the element at index `index` in the list stored at `key`.
* The index is zero-based, so 0 means the first element, 1 the second element and so on.
* Negative indices can be used to designate elements starting at the tail of the list.
diff --git a/node/src/Commands.ts b/node/src/Commands.ts
index a75c3a7f4e..3f127fd1bf 100644
--- a/node/src/Commands.ts
+++ b/node/src/Commands.ts
@@ -3942,3 +3942,14 @@ export function createGetEx(
return createCommand(RequestType.GetEx, args);
}
+
+/**
+ * @internal
+ */
+export function createXAck(
+ key: string,
+ group: string,
+ ids: string[],
+): command_request.Command {
+ return createCommand(RequestType.XAck, [key, group, ...ids]);
+}
diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts
index 8d5d633fc9..2fd3fa5e02 100644
--- a/node/src/Transaction.ts
+++ b/node/src/Transaction.ts
@@ -207,6 +207,7 @@ import {
createType,
createUnlink,
createWait,
+ createXAck,
createXAdd,
createXAutoClaim,
createXClaim,
@@ -2892,6 +2893,22 @@ export class BaseTransaction> {
);
}
+ /**
+ * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
+ * This command should be called on a pending message so that such message does not get processed again.
+ *
+ * @see {@link https://valkey.io/commands/xack/|valkey.io} for details.
+ *
+ * @param key - The key of the stream.
+ * @param group - The consumer group name.
+ * @param ids - An array of entry ids.
+ *
+ * Command Response - The number of messages that were successfully acknowledged.
+ */
+ public xack(key: string, group: string, ids: string[]): T {
+ return this.addAndReturn(createXAck(key, group, ids));
+ }
+
/**
* Renames `key` to `newkey`.
* If `newkey` already exists it is overwritten.
diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts
index 3d815e3b57..62bf853a71 100644
--- a/node/tests/SharedTests.ts
+++ b/node/tests/SharedTests.ts
@@ -9834,6 +9834,119 @@ export function runBaseTests(config: {
config.timeout,
);
+ it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
+ `xack test_%p`,
+ async (protocol) => {
+ await runTest(async (client: BaseClient) => {
+ const key = "{testKey}:1-" + uuidv4();
+ const nonExistingKey = "{testKey}:2-" + uuidv4();
+ const string_key = "{testKey}:3-" + uuidv4();
+ const groupName = uuidv4();
+ const consumerName = uuidv4();
+ const stream_id0 = "0";
+ const stream_id1_0 = "1-0";
+ const stream_id1_1 = "1-1";
+ const stream_id1_2 = "1-2";
+
+ // setup: add 2 entries to the stream, create consumer group and read to mark them as pending
+ expect(
+ await client.xadd(key, [["f0", "v0"]], {
+ id: stream_id1_0,
+ }),
+ ).toEqual(stream_id1_0);
+ expect(
+ await client.xadd(key, [["f1", "v1"]], {
+ id: stream_id1_1,
+ }),
+ ).toEqual(stream_id1_1);
+ expect(
+ await client.xgroupCreate(key, groupName, stream_id0),
+ ).toBe("OK");
+ expect(
+ await client.xreadgroup(groupName, consumerName, {
+ [key]: ">",
+ }),
+ ).toEqual({
+ [key]: {
+ [stream_id1_0]: [["f0", "v0"]],
+ [stream_id1_1]: [["f1", "v1"]],
+ },
+ });
+
+ // add one more entry
+ expect(
+ await client.xadd(key, [["f2", "v2"]], {
+ id: stream_id1_2,
+ }),
+ ).toEqual(stream_id1_2);
+
+ // acknowledge the first 2 entries
+ expect(
+ await client.xack(key, groupName, [
+ stream_id1_0,
+ stream_id1_1,
+ ]),
+ ).toBe(2);
+
+ // attempt to acknowledge the first 2 entries again, returns 0 since they were already acknowledged
+ expect(
+ await client.xack(key, groupName, [
+ stream_id1_0,
+ stream_id1_1,
+ ]),
+ ).toBe(0);
+
+ // read the last unacknowledged entry
+ expect(
+ await client.xreadgroup(groupName, consumerName, {
+ [key]: ">",
+ }),
+ ).toEqual({ [key]: { [stream_id1_2]: [["f2", "v2"]] } });
+
+ // deleting the consumer, returns 1 since the last entry still hasn't been acknowledged
+ expect(
+ await client.xgroupDelConsumer(
+ key,
+ groupName,
+ consumerName,
+ ),
+ ).toBe(1);
+
+ // attempt to acknowledge a non-existing key, returns 0
+ expect(
+ await client.xack(nonExistingKey, groupName, [
+ stream_id1_0,
+ ]),
+ ).toBe(0);
+
+ // attempt to acknowledge a non-existing group name, returns 0
+ expect(
+ await client.xack(key, "nonExistingGroup", [stream_id1_0]),
+ ).toBe(0);
+
+ // attempt to acknowledge a non-existing ID, returns 0
+ expect(await client.xack(key, groupName, ["99-99"])).toBe(0);
+
+ // invalid argument - ID list must not be empty
+ await expect(client.xack(key, groupName, [])).rejects.toThrow(
+ RequestError,
+ );
+
+ // invalid argument - invalid stream ID format
+ await expect(
+ client.xack(key, groupName, ["invalid stream ID format"]),
+ ).rejects.toThrow(RequestError);
+
+ // key exists, but is not a stream
+ expect(await client.set(string_key, "xack")).toBe("OK");
+ await expect(
+ client.xack(string_key, groupName, [stream_id1_0]),
+ ).rejects.toThrow(RequestError);
+ }, protocol);
+ },
+ config.timeout,
+ );
+
it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`lmpop test_%p`,
async (protocol) => {
diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts
index 4605f89aa9..b6680eb685 100644
--- a/node/tests/TestUtilities.ts
+++ b/node/tests/TestUtilities.ts
@@ -1215,6 +1215,8 @@ export async function transactionTest(
]);
}
+ baseTransaction.xack(key9, groupName1, ["0-3"]);
+ responseData.push(["xack(key9, groupName1, ['0-3'])", 0]);
baseTransaction.xgroupDelConsumer(key9, groupName1, consumer);
responseData.push(["xgroupDelConsumer(key9, groupName1, consumer)", 1]);
baseTransaction.xgroupDestroy(key9, groupName1);