Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Added XDEL command #2064

Merged
merged 5 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* Node: Added ZREMRANGEBYLEX command ([#2025]((https://github.com/valkey-io/valkey-glide/pull/2025))
* Node: Added ZSCAN command ([#2061](https://github.com/valkey-io/valkey-glide/pull/2061))
* Node: Added SETRANGE command ([#2066](https://github.com/valkey-io/valkey-glide/pull/2066))
* Node: Added XDEL command ([#2064]((https://github.com/valkey-io/valkey-glide/pull/2064))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
21 changes: 21 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ import {
createType,
createUnlink,
createXAdd,
createXDel,
createXLen,
createXRead,
createXTrim,
Expand Down Expand Up @@ -3489,6 +3490,26 @@ export class BaseClient {
return this.createWritePromise(createXAdd(key, values, options));
}

/**
* Removes the specified entries by id from a stream, and returns the number of entries deleted.
*
* See https://valkey.io/commands/xdel for more details.
*
* @param key - The key of the stream.
* @param ids - An array of entry ids.
* @returns The number of entries removed from the stream. This number may be less than the number of entries in
* `ids`, if the specified `ids` don't exist in the stream.
*
* @example
* ```typescript
* console.log(await client.xdel("key", ["1538561698944-0", "1538561698944-1"]));
* // Output is 2 since the stream marked 2 entries as deleted.
* ```
*/
public xdel(key: string, ids: string[]): Promise<number> {
return this.createWritePromise(createXDel(key, ids));
}

/**
* Trims the stream stored at `key` by evicting older entries.
* See https://valkey.io/commands/xtrim/ for more details.
Expand Down
13 changes: 13 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1933,6 +1933,9 @@ function addTrimOptions(options: StreamTrimOptions, args: string[]) {
}
}

/**
* @internal
*/
export function createXAdd(
key: string,
values: [string, string][],
Expand Down Expand Up @@ -1962,6 +1965,16 @@ export function createXAdd(
return createCommand(RequestType.XAdd, args);
}

/**
* @internal
*/
export function createXDel(
key: string,
ids: string[],
): command_request.Command {
return createCommand(RequestType.XDel, [key, ...ids]);
}

/**
* @internal
*/
Expand Down
28 changes: 24 additions & 4 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ import {
createType,
createUnlink,
createXAdd,
createXDel,
createXLen,
createXRead,
createXTrim,
Expand Down Expand Up @@ -1984,7 +1985,9 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
*
* @param key - The key of the stream.
* @param values - field-value pairs to be added to the entry.
* @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists.
* @param options - (Optional) Stream add options.
*
* Command Response - The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists.
GumpacG marked this conversation as resolved.
Show resolved Hide resolved
*/
public xadd(
key: string,
Expand All @@ -1994,13 +1997,29 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXAdd(key, values, options));
}

/**
* Removes the specified entries by id from a stream, and returns the number of entries deleted.
*
* See https://valkey.io/commands/xdel for more details.
*
* @param key - The key of the stream.
* @param ids - An array of entry ids.
*
* Command Response - The number of entries removed from the stream. This number may be less than the number of entries in
* `ids`, if the specified `ids` don't exist in the stream.
*/
public xdel(key: string, ids: string[]): T {
return this.addAndReturn(createXDel(key, ids));
}

/**
* Trims the stream stored at `key` by evicting older entries.
* See https://valkey.io/commands/xtrim/ for more details.
*
* @param key - the key of the stream
* @param options - options detailing how to trim the stream.
* @returns The number of entries deleted from the stream. If `key` doesn't exist, 0 is returned.
*
* Command Response - The number of entries deleted from the stream. If `key` doesn't exist, 0 is returned.
*/
public xtrim(key: string, options: StreamTrimOptions): T {
return this.addAndReturn(createXTrim(key, options));
Expand All @@ -2009,7 +2028,7 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
/** Returns the server time.
* See https://valkey.io/commands/time/ for details.
*
* @returns - The current server time as a two items `array`:
* Command Response - The current server time as a two items `array`:
* A Unix timestamp and the amount of microseconds already elapsed in the current second.
* The returned `array` is in a [Unix timestamp, Microseconds already elapsed] format.
*/
Expand All @@ -2023,7 +2042,8 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
*
* @param keys_and_ids - pairs of keys and entry ids to read from. A pair is composed of a stream's key and the id of the entry after which the stream will be read.
* @param options - options detailing how to read the stream.
* @returns A map between a stream key, and an array of entries in the matching key. The entries are in an [id, fields[]] format.
*
* Command Response - A map between a stream key, and an array of entries in the matching key. The entries are in an [id, fields[]] format.
*/
public xread(
keys_and_ids: Record<string, string>,
Expand Down
76 changes: 67 additions & 9 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5941,11 +5941,8 @@ export function runBaseTests<Context>(config: {

// Setup test data - use a large number of entries to force an iterative cursor.
const numberMap: Record<string, number> = {};
const expectedNumberMapArray: string[] = [];

for (let i = 0; i < 10000; i++) {
expectedNumberMapArray.push(i.toString());
expectedNumberMapArray.push(i.toString());
for (let i = 0; i < 50000; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be its own PR ;)
I'll let it slide this time

numberMap[i.toString()] = i;
}

Expand Down Expand Up @@ -6018,15 +6015,18 @@ export function runBaseTests<Context>(config: {
}

// Fetching by cursor is randomized.
const expectedCombinedMapArray =
expectedNumberMapArray.concat(expectedCharMapArray);
const expectedFullMap: Record<string, number> = {
...numberMap,
...charMap,
};

expect(fullResultMapArray.length).toEqual(
expectedCombinedMapArray.length,
Object.keys(expectedFullMap).length * 2,
);

for (let i = 0; i < fullResultMapArray.length; i += 2) {
expect(fullResultMapArray).toContain(
expectedCombinedMapArray[i],
expect(fullResultMapArray[i] in expectedFullMap).toEqual(
true,
);
}

Expand Down Expand Up @@ -6544,6 +6544,64 @@ export function runBaseTests<Context>(config: {
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
`xdel test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
const key = uuidv4();
const stringKey = uuidv4();
const nonExistentKey = uuidv4();
const streamId1 = "0-1";
const streamId2 = "0-2";
const streamId3 = "0-3";

expect(
await client.xadd(
key,
[
["f1", "foo1"],
["f2", "foo2"],
],
{ id: streamId1 },
),
).toEqual(streamId1);

expect(
await client.xadd(
key,
[
["f1", "foo1"],
["f2", "foo2"],
],
{ id: streamId2 },
),
).toEqual(streamId2);

expect(await client.xlen(key)).toEqual(2);

// deletes one stream id, and ignores anything invalid
expect(await client.xdel(key, [streamId1, streamId3])).toEqual(
1,
);
expect(await client.xdel(nonExistentKey, [streamId3])).toEqual(
0,
);

// invalid argument - id list should not be empty
await expect(client.xdel(key, [])).rejects.toThrow(
RequestError,
);

// key exists, but it is not a stream
expect(await client.set(stringKey, "foo")).toEqual("OK");
await expect(
client.xdel(stringKey, [streamId3]),
).rejects.toThrow(RequestError);
}, protocol);
},
config.timeout,
);
}

export function runCommonTests<Context>(config: {
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,8 @@ export async function transactionTest(
'xtrim(key9, { method: "minid", threshold: "0-2", exact: true }',
1,
]);
baseTransaction.xdel(key9, ["0-3", "0-5"]);
responseData.push(["xdel(key9, [['0-3', '0-5']])", 1]);
baseTransaction.rename(key9, key10);
responseData.push(["rename(key9, key10)", "OK"]);
baseTransaction.exists([key10]);
Expand Down
Loading