From ce69945ded80b57d1680c345c50d743661d42620 Mon Sep 17 00:00:00 2001 From: Yury-Fridlyand Date: Fri, 9 Aug 2024 11:41:40 -0700 Subject: [PATCH] Node: Add `XPENDING` command. (#2085) * Add `XPENDING` command. Signed-off-by: Yury-Fridlyand --- CHANGELOG.md | 1 + node/npm/glide/index.ts | 4 +- node/src/BaseClient.ts | 73 +++++++++++++++++++ node/src/Commands.ts | 138 +++++++++++++++++++++++++----------- node/src/Transaction.ts | 38 ++++++++++ node/tests/SharedTests.ts | 100 ++++++++++++++++++++++++++ node/tests/TestUtilities.ts | 14 ++++ 7 files changed, 327 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d8b35ef31..4531e039ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ #### Changes +* Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085)) * Node: Added XINFO CONSUMERS command ([#2093](https://github.com/valkey-io/valkey-glide/pull/2093)) * Node: Added HRANDFIELD command ([#2096](https://github.com/valkey-io/valkey-glide/pull/2096)) * Node: Added FUNCTION STATS commands ([#2082](https://github.com/valkey-io/valkey-glide/pull/2082)) diff --git a/node/npm/glide/index.ts b/node/npm/glide/index.ts index a78c0bdd0c..a4686e774b 100644 --- a/node/npm/glide/index.ts +++ b/node/npm/glide/index.ts @@ -140,6 +140,7 @@ function initialize() { StreamAddOptions, StreamReadOptions, StreamClaimOptions, + StreamPendingOptions, ScriptOptions, ClosingError, ConfigurationError, @@ -232,8 +233,9 @@ function initialize() { StreamGroupOptions, StreamTrimOptions, StreamAddOptions, - StreamReadOptions, StreamClaimOptions, + StreamReadOptions, + StreamPendingOptions, ScriptOptions, ClosingError, ConfigurationError, diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index f2bbe54e4a..549f81ea37 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -48,6 +48,7 @@ import { StreamAddOptions, StreamClaimOptions, StreamGroupOptions, + StreamPendingOptions, StreamReadOptions, StreamTrimOptions, ZAddOptions, @@ -170,6 +171,7 @@ import { createXGroupCreateConsumer, createXGroupDelConsumer, createXLen, + createXPending, createXRead, createXTrim, createZAdd, @@ -3925,6 +3927,77 @@ export class BaseClient { return this.createWritePromise(createXLen(key)); } + /** + * Returns stream message summary information for pending messages matching a given range of IDs. + * + * See https://valkey.io/commands/xpending/ for more details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @returns An `array` that includes the summary of the pending messages. See example for more details. + * @example + * ```typescript + * console.log(await client.xpending("my_stream", "my_group")); // Output: + * // [ + * // 42, // The total number of pending messages + * // "1722643465939-0", // The smallest ID among the pending messages + * // "1722643484626-0", // The greatest ID among the pending messages + * // [ // A 2D-`array` of every consumer in the group + * // [ "consumer1", "10" ], // with at least one pending message, and the + * // [ "consumer2", "32" ], // number of pending messages it has + * // ] + * // ] + * ``` + */ + public async xpending( + key: string, + group: string, + ): Promise<[number, string, string, [string, number][]]> { + return this.createWritePromise(createXPending(key, group)); + } + + /** + * Returns an extended form of stream message information for pending messages matching a given range of IDs. + * + * See https://valkey.io/commands/xpending/ for more details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param options - Additional options to filter entries, see {@link StreamPendingOptions}. + * @returns A 2D-`array` of 4-tuples containing extended message information. See example for more details. + * + * @example + * ```typescript + * console.log(await client.xpending("my_stream", "my_group"), { + * start: { value: "0-1", isInclusive: true }, + * end: InfScoreBoundary.PositiveInfinity, + * count: 2, + * consumer: "consumer1" + * }); // Output: + * // [ + * // [ + * // "1722643465939-0", // The ID of the message + * // "consumer1", // The name of the consumer that fetched the message and has still to acknowledge it + * // 174431, // The number of milliseconds that elapsed since the last time this message was delivered to this consumer + * // 1 // The number of times this message was delivered + * // ], + * // [ + * // "1722643484626-0", + * // "consumer1", + * // 202231, + * // 1 + * // ] + * // ] + * ``` + */ + public async xpendingWithOptions( + key: string, + group: string, + options: StreamPendingOptions, + ): Promise<[string, string, number, number][]> { + return this.createWritePromise(createXPending(key, group, options)); + } + /** * Returns the list of all consumers and their attributes for the given consumer group of the * stream stored at `key`. diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 489eb198a1..ca00417ac9 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -1628,37 +1628,52 @@ type SortedSetRange = { export type RangeByScore = SortedSetRange & { type: "byScore" }; export type RangeByLex = SortedSetRange & { type: "byLex" }; -/** - * Returns a string representation of a score boundary in Redis protocol format. - * @param score - The score boundary object containing value and inclusivity - * information. - * @param isLex - Indicates whether to return lexical representation for - * positive/negative infinity. - * @returns A string representation of the score boundary in Redis protocol - * format. - */ +/** Returns a string representation of a score boundary as a command argument. */ function getScoreBoundaryArg( score: ScoreBoundary | ScoreBoundary, - isLex: boolean = false, ): string { - if (score == InfScoreBoundary.PositiveInfinity) { - return ( - InfScoreBoundary.PositiveInfinity.toString() + (isLex ? "" : "inf") - ); + if (typeof score === "string") { + // InfScoreBoundary + return score + "inf"; } - if (score == InfScoreBoundary.NegativeInfinity) { - return ( - InfScoreBoundary.NegativeInfinity.toString() + (isLex ? "" : "inf") - ); + if (score.isInclusive == false) { + return "(" + score.value.toString(); + } + + return score.value.toString(); +} + +/** Returns a string representation of a lex boundary as a command argument. */ +function getLexBoundaryArg( + score: ScoreBoundary | ScoreBoundary, +): string { + if (typeof score === "string") { + // InfScoreBoundary + return score; + } + + if (score.isInclusive == false) { + return "(" + score.value.toString(); + } + + return "[" + score.value.toString(); +} + +/** Returns a string representation of a stream boundary as a command argument. */ +function getStreamBoundaryArg( + score: ScoreBoundary | ScoreBoundary, +): string { + if (typeof score === "string") { + // InfScoreBoundary + return score; } if (score.isInclusive == false) { return "(" + score.value.toString(); } - const value = isLex ? "[" + score.value.toString() : score.value.toString(); - return value; + return score.value.toString(); } function createZRangeArgs( @@ -1671,10 +1686,20 @@ function createZRangeArgs( if (typeof rangeQuery.start != "number") { rangeQuery = rangeQuery as RangeByScore | RangeByLex; - const isLex = rangeQuery.type == "byLex"; - args.push(getScoreBoundaryArg(rangeQuery.start, isLex)); - args.push(getScoreBoundaryArg(rangeQuery.stop, isLex)); - args.push(isLex == true ? "BYLEX" : "BYSCORE"); + + if (rangeQuery.type == "byLex") { + args.push( + getLexBoundaryArg(rangeQuery.start), + getLexBoundaryArg(rangeQuery.stop), + "BYLEX", + ); + } else { + args.push( + getScoreBoundaryArg(rangeQuery.start), + getScoreBoundaryArg(rangeQuery.stop), + "BYSCORE", + ); + } } else { args.push(rangeQuery.start.toString()); args.push(rangeQuery.stop.toString()); @@ -1707,9 +1732,11 @@ export function createZCount( minScore: ScoreBoundary, maxScore: ScoreBoundary, ): command_request.Command { - const args = [key]; - args.push(getScoreBoundaryArg(minScore)); - args.push(getScoreBoundaryArg(maxScore)); + const args = [ + key, + getScoreBoundaryArg(minScore), + getScoreBoundaryArg(maxScore), + ]; return createCommand(RequestType.ZCount, args); } @@ -1862,11 +1889,7 @@ export function createZRemRangeByLex( minLex: ScoreBoundary, maxLex: ScoreBoundary, ): command_request.Command { - const args = [ - key, - getScoreBoundaryArg(minLex, true), - getScoreBoundaryArg(maxLex, true), - ]; + const args = [key, getLexBoundaryArg(minLex), getLexBoundaryArg(maxLex)]; return createCommand(RequestType.ZRemRangeByLex, args); } @@ -1878,12 +1901,15 @@ export function createZRemRangeByScore( minScore: ScoreBoundary, maxScore: ScoreBoundary, ): command_request.Command { - const args = [key]; - args.push(getScoreBoundaryArg(minScore)); - args.push(getScoreBoundaryArg(maxScore)); + const args = [ + key, + getScoreBoundaryArg(minScore), + getScoreBoundaryArg(maxScore), + ]; return createCommand(RequestType.ZRemRangeByScore, args); } +/** @internal */ export function createPersist(key: string): command_request.Command { return createCommand(RequestType.Persist, [key]); } @@ -1896,11 +1922,7 @@ export function createZLexCount( minLex: ScoreBoundary, maxLex: ScoreBoundary, ): command_request.Command { - const args = [ - key, - getScoreBoundaryArg(minLex, true), - getScoreBoundaryArg(maxLex, true), - ]; + const args = [key, getLexBoundaryArg(minLex), getLexBoundaryArg(maxLex)]; return createCommand(RequestType.ZLexCount, args); } @@ -2417,6 +2439,42 @@ export function createXLen(key: string): command_request.Command { return createCommand(RequestType.XLen, [key]); } +/** Optional arguments for {@link BaseClient.xpendingWithOptions|xpending}. */ +export type StreamPendingOptions = { + /** Filter pending entries by their idle time - in milliseconds */ + minIdleTime?: number; + /** Starting stream ID bound for range. */ + start: ScoreBoundary; + /** Ending stream ID bound for range. */ + end: ScoreBoundary; + /** Limit the number of messages returned. */ + count: number; + /** Filter pending entries by consumer. */ + consumer?: string; +}; + +/** @internal */ +export function createXPending( + key: string, + group: string, + options?: StreamPendingOptions, +): command_request.Command { + const args = [key, group]; + + if (options) { + if (options.minIdleTime !== undefined) + args.push("IDLE", options.minIdleTime.toString()); + args.push( + getStreamBoundaryArg(options.start), + getStreamBoundaryArg(options.end), + options.count.toString(), + ); + if (options.consumer) args.push(options.consumer); + } + + return createCommand(RequestType.XPending, args); +} + /** @internal */ export function createXInfoConsumers( key: string, diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 4b74520910..e4fe1f5674 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -54,6 +54,7 @@ import { StreamAddOptions, StreamClaimOptions, StreamGroupOptions, + StreamPendingOptions, StreamReadOptions, StreamTrimOptions, ZAddOptions, @@ -205,6 +206,7 @@ import { createXInfoConsumers, createXInfoStream, createXLen, + createXPending, createXRead, createXTrim, createZAdd, @@ -2312,6 +2314,9 @@ export class BaseTransaction> { } /** + * Returns stream message summary information for pending messages matching a given range of IDs. + * + * See https://valkey.io/commands/xpending/ for more details. * Returns the list of all consumers and their attributes for the given consumer group of the * stream stored at `key`. * @@ -2320,6 +2325,39 @@ export class BaseTransaction> { * @param key - The key of the stream. * @param group - The consumer group name. * + * Command Response - An `array` that includes the summary of the pending messages. + * See example of {@link BaseClient.xpending|xpending} for more details. + */ + public xpending(key: string, group: string): T { + return this.addAndReturn(createXPending(key, group)); + } + + /** + * Returns stream message summary information for pending messages matching a given range of IDs. + * + * See https://valkey.io/commands/xpending/ for more details. + * + * @param key - The key of the stream. + * @param group - The consumer group name. + * @param options - Additional options to filter entries, see {@link StreamPendingOptions}. + * + * Command Response - A 2D-`array` of 4-tuples containing extended message information. + * See example of {@link BaseClient.xpendingWithOptions|xpendingWithOptions} for more details. + */ + public xpendingWithOptions( + key: string, + group: string, + options: StreamPendingOptions, + ): T { + return this.addAndReturn(createXPending(key, group, options)); + } + + /** + * Returns the list of all consumers and their attributes for the given consumer group of the + * stream stored at `key`. + * + * See https://valkey.io/commands/xinfo-consumers/ for more details. + * * Command Response - An `Array` of `Records`, where each mapping contains the attributes * of a consumer for the given consumer group of the stream at `key`. */ diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 1bfdbada9d..47c6e1cbb6 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -7678,6 +7678,106 @@ export function runBaseTests(config: { config.timeout, ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `xpending test_%p`, + async (protocol) => { + await runTest(async (client: BaseClient) => { + const key = uuidv4(); + const group = uuidv4(); + + expect( + await client.xgroupCreate(key, group, "0", { + mkStream: true, + }), + ).toEqual("OK"); + expect( + await client.customCommand([ + "xgroup", + "createconsumer", + key, + group, + "consumer", + ]), + ).toEqual(true); + + expect( + await client.xadd( + key, + [ + ["entry1_field1", "entry1_value1"], + ["entry1_field2", "entry1_value2"], + ], + { id: "0-1" }, + ), + ).toEqual("0-1"); + expect( + await client.xadd( + key, + [["entry2_field1", "entry2_value1"]], + { id: "0-2" }, + ), + ).toEqual("0-2"); + + expect( + await client.customCommand([ + "xreadgroup", + "group", + group, + "consumer", + "STREAMS", + key, + ">", + ]), + ).toEqual({ + [key]: { + "0-1": [ + ["entry1_field1", "entry1_value1"], + ["entry1_field2", "entry1_value2"], + ], + "0-2": [["entry2_field1", "entry2_value1"]], + }, + }); + + // wait to get some minIdleTime + await new Promise((resolve) => setTimeout(resolve, 500)); + + expect(await client.xpending(key, group)).toEqual([ + 2, + "0-1", + "0-2", + [["consumer", "2"]], + ]); + + const result = await client.xpendingWithOptions(key, group, { + start: InfScoreBoundary.NegativeInfinity, + end: InfScoreBoundary.PositiveInfinity, + count: 1, + minIdleTime: 42, + }); + result[0][2] = 0; // overwrite msec counter to avoid test flakyness + expect(result).toEqual([["0-1", "consumer", 0, 1]]); + + // not existing consumer + expect( + await client.xpendingWithOptions(key, group, { + start: { value: "0-1", isInclusive: true }, + end: { value: "0-2", isInclusive: false }, + count: 12, + consumer: "_", + }), + ).toEqual([]); + + // key exists, but it is not a stream + const stringKey = uuidv4(); + expect(await client.set(stringKey, "foo")).toEqual("OK"); + await expect(client.xpending(stringKey, "_")).rejects.toThrow( + RequestError, + ); + }, protocol); + }, + config.timeout, + ); + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( `xclaim test_%p`, async (protocol) => { diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index ec2090f520..9660d06809 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -983,6 +983,20 @@ export async function transactionTest( 'xreadgroup(groupName1, "consumer1", key9, >)', { [key9]: { "0-2": [["field", "value2"]] } }, ]); + baseTransaction.xpending(key9, groupName1); + responseData.push([ + "xpending(key9, groupName1)", + [1, "0-2", "0-2", [["consumer1", "1"]]], + ]); + baseTransaction.xpendingWithOptions(key9, groupName1, { + start: InfScoreBoundary.NegativeInfinity, + end: InfScoreBoundary.PositiveInfinity, + count: 10, + }); + responseData.push([ + "xpending(key9, groupName1, -, +, 10)", + [["0-2", "consumer1", 0, 1]], + ]); baseTransaction.xclaim(key9, groupName1, "consumer1", 0, ["0-2"]); responseData.push([ 'xclaim(key9, groupName1, "consumer1", 0, ["0-2"])',