Skip to content

Commit

Permalink
Added XDEL command
Browse files Browse the repository at this point in the history
Signed-off-by: Guian Gumpac <guian.gumpac@improving.com>
  • Loading branch information
GumpacG committed Aug 1, 2024
1 parent efb6577 commit 5254113
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 4 deletions.
21 changes: 21 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ import {
createType,
createUnlink,
createXAdd,
createXDel,
createXLen,
createXRead,
createXTrim,
Expand Down Expand Up @@ -3486,6 +3487,26 @@ export class BaseClient {
return this.createWritePromise(createXAdd(key, values, options));
}

/**
* Removes the specified entries by id from a stream, and returns the number of entries deleted.
*
* See https://valkey.io/commands/xdel for more details.
*
* @param key - The key of the stream.
* @param ids - An array of entry ids.
* @returns The number of entries removed from the stream. This number may be less than the number of entries in
* `ids`, if the specified `ids` don't exist in the stream.
*
* @example
* ```typescript
* console.log(await client.xdel("key", ["1538561698944-0", "1538561698944-1"]));
* // Output is 2 since the stream marked 2 entries as deleted.
* ```
*/
public xdel(key: string, ids: string[]): Promise<string> {
return this.createWritePromise(createXDel(key, ids));
}

/**
* Trims the stream stored at `key` by evicting older entries.
* See https://valkey.io/commands/xtrim/ for more details.
Expand Down
13 changes: 13 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,9 @@ function addTrimOptions(options: StreamTrimOptions, args: string[]) {
}
}

/**
* @internal
*/
export function createXAdd(
key: string,
values: [string, string][],
Expand Down Expand Up @@ -1962,6 +1965,16 @@ export function createXAdd(
return createCommand(RequestType.XAdd, args);
}

/**
* @internal
*/
export function createXDel(
key: string,
ids: string[],
): command_request.Command {
return createCommand(RequestType.XDel, [key, ...ids]);
}

/**
* @internal
*/
Expand Down
27 changes: 23 additions & 4 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ import {
createType,
createUnlink,
createXAdd,
createXDel,
createXLen,
createXRead,
createXTrim,
Expand Down Expand Up @@ -1972,7 +1973,8 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
*
* @param key - The key of the stream.
* @param values - field-value pairs to be added to the entry.
* @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists.
*
* Command Response - The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists.
*/
public xadd(
key: string,
Expand All @@ -1982,13 +1984,29 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXAdd(key, values, options));
}

/**
* Removes the specified entries by id from a stream, and returns the number of entries deleted.
*
* See https://valkey.io/commands/xdel for more details.
*
* @param key - The key of the stream.
* @param ids - An array of entry ids.
*
* Command Response - The number of entries removed from the stream. This number may be less than the number of entries in
* `ids`, if the specified `ids` don't exist in the stream.
*/
public xdel(key: string, ids: string[]): T {
return this.addAndReturn(createXDel(key, ids));
}

/**
* Trims the stream stored at `key` by evicting older entries.
* See https://valkey.io/commands/xtrim/ for more details.
*
* @param key - the key of the stream
* @param options - options detailing how to trim the stream.
* @returns The number of entries deleted from the stream. If `key` doesn't exist, 0 is returned.
*
* Command Response - The number of entries deleted from the stream. If `key` doesn't exist, 0 is returned.
*/
public xtrim(key: string, options: StreamTrimOptions): T {
return this.addAndReturn(createXTrim(key, options));
Expand All @@ -1997,7 +2015,7 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
/** Returns the server time.
* See https://valkey.io/commands/time/ for details.
*
* @returns - The current server time as a two items `array`:
* Command Response - The current server time as a two items `array`:
* A Unix timestamp and the amount of microseconds already elapsed in the current second.
* The returned `array` is in a [Unix timestamp, Microseconds already elapsed] format.
*/
Expand All @@ -2011,7 +2029,8 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
*
* @param keys_and_ids - pairs of keys and entry ids to read from. A pair is composed of a stream's key and the id of the entry after which the stream will be read.
* @param options - options detailing how to read the stream.
* @returns A map between a stream key, and an array of entries in the matching key. The entries are in an [id, fields[]] format.
*
* Command Response - A map between a stream key, and an array of entries in the matching key. The entries are in an [id, fields[]] format.
*/
public xread(
keys_and_ids: Record<string, string>,
Expand Down
58 changes: 58 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6366,6 +6366,64 @@ export function runBaseTests<Context>(config: {
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`xdel test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
const key = uuidv4();
const stringKey = uuidv4();
const nonExistentKey = uuidv4();
const streamId1 = "0-1";
const streamId2 = "0-2";
const streamId3 = "0-3";

expect(
await client.xadd(
key,
[
["f1", "foo1"],
["f2", "foo2"],
],
{ id: streamId1 },
),
).toEqual(streamId1);

expect(
await client.xadd(
key,
[
["f1", "foo1"],
["f2", "foo2"],
],
{ id: streamId2 },
),
).toEqual(streamId2);

expect(await client.xlen(key)).toEqual(2);

// deletes one stream id, and ignores anything invalid
expect(await client.xdel(key, [streamId1, streamId3])).toEqual(
1,
);
expect(await client.xdel(nonExistentKey, [streamId3])).toEqual(
0,
);

// invalid argument - id list should not be empty
await expect(client.xdel(key, [])).rejects.toThrow(
RequestError,
);

// key exists, but it is not a stream
expect(await client.set(stringKey, "foo")).toEqual("OK");
await expect(
client.xdel(stringKey, [streamId3]),
).rejects.toThrow(RequestError);
}, protocol);
},
config.timeout,
);
}

export function runCommonTests<Context>(config: {
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,8 @@ export async function transactionTest(
'xtrim(key9, { method: "minid", threshold: "0-2", exact: true }',
1,
]);
baseTransaction.xdel(key9, ["0-3", "0-5"]);
responseData.push(["xdel(key9, [['0-3', '0-5']])", 1]);
baseTransaction.rename(key9, key10);
responseData.push(["rename(key9, key10)", "OK"]);
baseTransaction.exists([key10]);
Expand Down

0 comments on commit 5254113

Please sign in to comment.