Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: add BZMPOP command #2018

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* Node: Added FCALL and FCALL_RO commands ([#2011](https://github.com/valkey-io/valkey-glide/pull/2011))
* Node: Added ZMPOP command ([#1994](https://github.com/valkey-io/valkey-glide/pull/1994))
* Node: Added ZINCRBY command ([#2009](https://github.com/valkey-io/valkey-glide/pull/2009))
* Node: Added BZMPOP command ([#2018](https://github.com/valkey-io/valkey-glide/pull/2018))

#### Fixes
* Java: Add overloads for XADD to allow duplicate entry keys ([#1970](https://github.com/valkey-io/valkey-glide/pull/1970))
Expand Down
51 changes: 47 additions & 4 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
createBRPop,
createBitCount,
createBitOp,
createBZMPop,
createBitPos,
createDecr,
createDecrBy,
Expand Down Expand Up @@ -3673,7 +3674,7 @@ export class BaseClient {
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param count - The number of elements to pop.
* @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped.
* @returns A two-element `array` containing the key name of the set from which the element
* was popped, and a member-score `Record` of the popped element.
* If no member could be popped, returns `null`.
Expand All @@ -3688,12 +3689,54 @@ export class BaseClient {
* // Output: [ "zSet1", { three: 3, two: 2 } ] - "three" with score 3 and "two" with score 2 were popped from "zSet1".
* ```
*/
public zmpop(
key: string[],
public async zmpop(
keys: string[],
modifier: ScoreFilter,
count?: number,
): Promise<[string, [Record<string, number>]] | null> {
return this.createWritePromise(createZMPop(keys, modifier, count));
}

/**
* Pops a member-score pair from the first non-empty sorted set, with the given `keys` being
* checked in the order they are provided. Blocks the connection when there are no members
* to pop from any of the given sorted sets. `BZMPOP` is the blocking variant of {@link zmpop}.
*
* See https://valkey.io/commands/bzmpop/ for more details.
*
* @remarks
* 1. When in cluster mode, all `keys` must map to the same hash slot.
* 2. `BZMPOP` is a client blocking command, see {@link https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#blocking-commands | the wiki}
* for more details and best practices.
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param timeout - The number of seconds to wait for a blocking operation to complete.
GumpacG marked this conversation as resolved.
Show resolved Hide resolved
* A value of 0 will block indefinitely.
* @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped.
* @returns A two-element `array` containing the key name of the set from which the element
* was popped, and a member-score `Record` of the popped element.
* If no member could be popped, returns `null`.
*
* since Valkey version 7.0.0.
*
* @example
* ```typescript
* await client.zadd("zSet1", { one: 1.0, two: 2.0, three: 3.0 });
* await client.zadd("zSet2", { four: 4.0 });
* console.log(await client.bzmpop(["zSet1", "zSet2"], ScoreFilter.MAX, 0.1, 2));
* // Output: [ "zSet1", { three: 3, two: 2 } ] - "three" with score 3 and "two" with score 2 were popped from "zSet1".
GumpacG marked this conversation as resolved.
Show resolved Hide resolved
* ```
*/
public async bzmpop(
keys: string[],
modifier: ScoreFilter,
timeout: number,
count?: number,
): Promise<[string, [Record<string, number>]] | null> {
return this.createWritePromise(createZMPop(key, modifier, count));
return this.createWritePromise(
createBZMPop(keys, modifier, timeout, count),
);
}

/**
Expand Down
24 changes: 24 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2083,6 +2083,30 @@ export function createZMPop(
return createCommand(RequestType.ZMPop, args);
}

/**
* @internal
*/
export function createBZMPop(
keys: string[],
modifier: ScoreFilter,
timeout: number,
count?: number,
): command_request.Command {
const args: string[] = [
timeout.toString(),
keys.length.toString(),
...keys,
modifier,
];

if (count !== undefined) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (count !== undefined) {
if (count) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that with non-number types however, we can't do that with number types because it can be 0 which would be treated as false and behaves differently.

args.push("COUNT");
args.push(count.toString());
}

return createCommand(RequestType.BZMPop, args);
}

/**
* @internal
*/
Expand Down
34 changes: 33 additions & 1 deletion node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
ZAddOptions,
createBLPop,
createBRPop,
createBZMPop,
createBitCount,
createBitOp,
createBitPos,
Expand Down Expand Up @@ -2170,7 +2171,7 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param count - The number of elements to pop.
* @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped.
*
* Command Response - A two-element `array` containing the key name of the set from which the
* element was popped, and a member-score `Record` of the popped element.
Expand All @@ -2182,6 +2183,37 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createZMPop(keys, modifier, count));
}

/**
* Pops a member-score pair from the first non-empty sorted set, with the given `keys` being
GumpacG marked this conversation as resolved.
Show resolved Hide resolved
* checked in the order they are provided. Blocks the connection when there are no members
* to pop from any of the given sorted sets. `BZMPOP` is the blocking variant of {@link zmpop}.
*
* See https://valkey.io/commands/bzmpop/ for more details.
*
* @remarks `BZMPOP` is a client blocking command, see {@link https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#blocking-commands | the wiki}
* for more details and best practices.
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param timeout - The number of seconds to wait for a blocking operation to complete.
* A value of 0 will block indefinitely.
* @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped.
*
* Command Response - A two-element `array` containing the key name of the set from which the element
* was popped, and a member-score `Record` of the popped element.
* If no member could be popped, returns `null`.
*
* since Valkey version 7.0.0.
*/
public bzmpop(
GumpacG marked this conversation as resolved.
Show resolved Hide resolved
keys: string[],
modifier: ScoreFilter,
timeout: number,
count?: number,
): T {
return this.addAndReturn(createBZMPop(keys, modifier, timeout, count));
}

/**
* Increments the score of `member` in the sorted set stored at `key` by `increment`.
* If `member` does not exist in the sorted set, it is added with `increment` as its score.
Expand Down
1 change: 1 addition & 0 deletions node/tests/RedisClusterClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ describe("GlideClusterClient", () => {
client.sintercard(["abc", "zxy", "lkn"]),
client.zintercard(["abc", "zxy", "lkn"]),
client.zmpop(["abc", "zxy", "lkn"], ScoreFilter.MAX),
client.bzmpop(["abc", "zxy", "lkn"], ScoreFilter.MAX, 0.1),
);
}

Expand Down
86 changes: 86 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5001,6 +5001,92 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`bzmpop test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster: RedisCluster) => {
if (cluster.checkIfServerVersionLessThan("7.0.0")) return;
const key1 = "{key}-1" + uuidv4();
const key2 = "{key}-2" + uuidv4();
const nonExistingKey = "{key}-0" + uuidv4();
const stringKey = "{key}-string" + uuidv4();

expect(await client.zadd(key1, { a1: 1, b1: 2 })).toEqual(2);
expect(await client.zadd(key2, { a2: 0.1, b2: 0.2 })).toEqual(
2,
);

checkSimple(
await client.bzmpop([key1, key2], ScoreFilter.MAX, 0.1),
).toEqual([key1, { b1: 2 }]);
checkSimple(
await client.bzmpop([key2, key1], ScoreFilter.MAX, 0.1, 10),
).toEqual([key2, { a2: 0.1, b2: 0.2 }]);

// ensure that command doesn't time out even if timeout > request timeout (250ms by default)
expect(
await client.bzmpop([nonExistingKey], ScoreFilter.MAX, 0.5),
).toBeNull;
expect(
await client.bzmpop(
[nonExistingKey],
ScoreFilter.MAX,
0.55,
1,
),
).toBeNull;

// key exists, but it is not a sorted set
expect(await client.set(stringKey, "value")).toEqual("OK");
await expect(
client.bzmpop([stringKey], ScoreFilter.MAX, 0.1),
).rejects.toThrow(RequestError);
await expect(
client.bzmpop([stringKey], ScoreFilter.MAX, 0.1, 1),
).rejects.toThrow(RequestError);

// incorrect argument: key list should not be empty
await expect(
client.bzmpop([], ScoreFilter.MAX, 0.1, 1),
).rejects.toThrow(RequestError);

// incorrect argument: count should be greater than 0
await expect(
client.bzmpop([key1], ScoreFilter.MAX, 0.1, 0),
).rejects.toThrow(RequestError);

GumpacG marked this conversation as resolved.
Show resolved Hide resolved
// incorrect argument: timeout can not be a negative number
await expect(
client.bzmpop([key1], ScoreFilter.MAX, -1, 10),
).rejects.toThrow(RequestError);

// check that order of entries in the response is preserved
const entries: Record<string, number> = {};

for (let i = 0; i < 10; i++) {
// a0 => 0, a1 => 1 etc
entries["a" + i] = i;
}

expect(await client.zadd(key2, entries)).toEqual(10);
const result = await client.bzmpop(
[key2],
ScoreFilter.MIN,
0.1,
10,
);

if (result) {
expect(result[1]).toEqual(entries);
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved
}

// TODO: add test case with 0 timeout (no timeout) should never time out,
// but we wrap the test with timeout to avoid test failing or stuck forever
aaron-congo marked this conversation as resolved.
Show resolved Hide resolved
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`geodist test_%p`,
async (protocol) => {
Expand Down
12 changes: 12 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,18 @@ export async function transactionTest(
responseData.push(["zmpop([key14], MAX)", [key14, { two: 2.0 }]]);
baseTransaction.zmpop([key14], ScoreFilter.MAX, 1);
responseData.push(["zmpop([key14], MAX, 1)", [key14, { one: 1.0 }]]);
baseTransaction.zadd(key14, { one: 1.0, two: 2.0 });
responseData.push(["zadd(key14, { one: 1.0, two: 2.0 })", 2]);
baseTransaction.bzmpop([key14], ScoreFilter.MAX, 0.1);
responseData.push([
"bzmpop([key14], ScoreFilter.MAX, 0.1)",
[key14, { two: 2.0 }],
]);
baseTransaction.bzmpop([key14], ScoreFilter.MAX, 0.1, 1);
responseData.push([
"bzmpop([key14], ScoreFilter.MAX, 0.1, 1)",
[key14, { one: 1.0 }],
]);
}

baseTransaction.xadd(key9, [["field", "value1"]], { id: "0-1" });
Expand Down
Loading