From 5254113240accf4e2fb6a2a1249a4cd99b9484c5 Mon Sep 17 00:00:00 2001 From: Guian Gumpac Date: Wed, 31 Jul 2024 17:11:13 -0700 Subject: [PATCH] Added XDEL command Signed-off-by: Guian Gumpac --- node/src/BaseClient.ts | 21 ++++++++++++++ node/src/Commands.ts | 13 +++++++++ node/src/Transaction.ts | 27 ++++++++++++++--- node/tests/SharedTests.ts | 58 +++++++++++++++++++++++++++++++++++++ node/tests/TestUtilities.ts | 2 ++ 5 files changed, 117 insertions(+), 4 deletions(-) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index 0bac9ea262..711f1a3d0e 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -140,6 +140,7 @@ import { createType, createUnlink, createXAdd, + createXDel, createXLen, createXRead, createXTrim, @@ -3486,6 +3487,26 @@ export class BaseClient { return this.createWritePromise(createXAdd(key, values, options)); } + /** + * Removes the specified entries by id from a stream, and returns the number of entries deleted. + * + * See https://valkey.io/commands/xdel for more details. + * + * @param key - The key of the stream. + * @param ids - An array of entry ids. + * @returns The number of entries removed from the stream. This number may be less than the number of entries in + * `ids`, if the specified `ids` don't exist in the stream. + * + * @example + * ```typescript + * console.log(await client.xdel("key", ["1538561698944-0", "1538561698944-1"])); + * // Output is 2 since the stream marked 2 entries as deleted. + * ``` + */ + public xdel(key: string, ids: string[]): Promise { + return this.createWritePromise(createXDel(key, ids)); + } + /** * Trims the stream stored at `key` by evicting older entries. * See https://valkey.io/commands/xtrim/ for more details. diff --git a/node/src/Commands.ts b/node/src/Commands.ts index f23b972459..15e9da0e36 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -1933,6 +1933,9 @@ function addTrimOptions(options: StreamTrimOptions, args: string[]) { } } +/** + * @internal + */ export function createXAdd( key: string, values: [string, string][], @@ -1962,6 +1965,16 @@ export function createXAdd( return createCommand(RequestType.XAdd, args); } +/** + * @internal + */ +export function createXDel( + key: string, + ids: string[], +): command_request.Command { + return createCommand(RequestType.XDel, [key, ...ids]); +} + /** * @internal */ diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 79d6d7362b..f62d9da947 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -160,6 +160,7 @@ import { createType, createUnlink, createXAdd, + createXDel, createXLen, createXRead, createXTrim, @@ -1972,7 +1973,8 @@ export class BaseTransaction> { * * @param key - The key of the stream. * @param values - field-value pairs to be added to the entry. - * @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists. + * + * Command Response - The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists. */ public xadd( key: string, @@ -1982,13 +1984,29 @@ export class BaseTransaction> { return this.addAndReturn(createXAdd(key, values, options)); } + /** + * Removes the specified entries by id from a stream, and returns the number of entries deleted. + * + * See https://valkey.io/commands/xdel for more details. + * + * @param key - The key of the stream. + * @param ids - An array of entry ids. + * + * Command Response - The number of entries removed from the stream. This number may be less than the number of entries in + * `ids`, if the specified `ids` don't exist in the stream. + */ + public xdel(key: string, ids: string[]): T { + return this.addAndReturn(createXDel(key, ids)); + } + /** * Trims the stream stored at `key` by evicting older entries. * See https://valkey.io/commands/xtrim/ for more details. * * @param key - the key of the stream * @param options - options detailing how to trim the stream. - * @returns The number of entries deleted from the stream. If `key` doesn't exist, 0 is returned. + * + * Command Response - The number of entries deleted from the stream. If `key` doesn't exist, 0 is returned. */ public xtrim(key: string, options: StreamTrimOptions): T { return this.addAndReturn(createXTrim(key, options)); @@ -1997,7 +2015,7 @@ export class BaseTransaction> { /** Returns the server time. * See https://valkey.io/commands/time/ for details. * - * @returns - The current server time as a two items `array`: + * Command Response - The current server time as a two items `array`: * A Unix timestamp and the amount of microseconds already elapsed in the current second. * The returned `array` is in a [Unix timestamp, Microseconds already elapsed] format. */ @@ -2011,7 +2029,8 @@ export class BaseTransaction> { * * @param keys_and_ids - pairs of keys and entry ids to read from. A pair is composed of a stream's key and the id of the entry after which the stream will be read. * @param options - options detailing how to read the stream. - * @returns A map between a stream key, and an array of entries in the matching key. The entries are in an [id, fields[]] format. + * + * Command Response - A map between a stream key, and an array of entries in the matching key. The entries are in an [id, fields[]] format. */ public xread( keys_and_ids: Record, diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index e6876170b9..d144ea8239 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -6366,6 +6366,64 @@ export function runBaseTests(config: { }, config.timeout, ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xdel test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = uuidv4(); + const stringKey = uuidv4(); + const nonExistentKey = uuidv4(); + const streamId1 = "0-1"; + const streamId2 = "0-2"; + const streamId3 = "0-3"; + + expect( + await client.xadd( + key, + [ + ["f1", "foo1"], + ["f2", "foo2"], + ], + { id: streamId1 }, + ), + ).toEqual(streamId1); + + expect( + await client.xadd( + key, + [ + ["f1", "foo1"], + ["f2", "foo2"], + ], + { id: streamId2 }, + ), + ).toEqual(streamId2); + + expect(await client.xlen(key)).toEqual(2); + + // deletes one stream id, and ignores anything invalid + expect(await client.xdel(key, [streamId1, streamId3])).toEqual( + 1, + ); + expect(await client.xdel(nonExistentKey, [streamId3])).toEqual( + 0, + ); + + // invalid argument - id list should not be empty + await expect(client.xdel(key, [])).rejects.toThrow( + RequestError, + ); + + // key exists, but it is not a stream + expect(await client.set(stringKey, "foo")).toEqual("OK"); + await expect( + client.xdel(stringKey, [streamId3]), + ).rejects.toThrow(RequestError); + }, protocol); + }, + config.timeout, + ); } export function runCommonTests(config: { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index 99f53b854d..6b63a18b9e 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -826,6 +826,8 @@ export async function transactionTest( 'xtrim(key9, { method: "minid", threshold: "0-2", exact: true }', 1, ]); + baseTransaction.xdel(key9, ["0-3", "0-5"]); + responseData.push(["xdel(key9, [['0-3', '0-5']])", 1]); baseTransaction.rename(key9, key10); responseData.push(["rename(key9, key10)", "OK"]); baseTransaction.exists([key10]);