Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node: Add XPENDING command. #2085

Merged
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Node: Added XPENDING commands ([#2085](https://github.com/valkey-io/valkey-glide/pull/2085))
* Node: Added XINFO CONSUMERS command ([#2093](https://github.com/valkey-io/valkey-glide/pull/2093))
* Node: Added HRANDFIELD command ([#2096](https://github.com/valkey-io/valkey-glide/pull/2096))
* Node: Added FUNCTION STATS commands ([#2082](https://github.com/valkey-io/valkey-glide/pull/2082))
Expand Down
4 changes: 3 additions & 1 deletion node/npm/glide/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ function initialize() {
StreamAddOptions,
StreamReadOptions,
StreamClaimOptions,
StreamPendingOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down Expand Up @@ -232,8 +233,9 @@ function initialize() {
StreamGroupOptions,
StreamTrimOptions,
StreamAddOptions,
StreamReadOptions,
StreamClaimOptions,
StreamReadOptions,
StreamPendingOptions,
ScriptOptions,
ClosingError,
ConfigurationError,
Expand Down
73 changes: 73 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import {
StreamAddOptions,
StreamClaimOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamReadOptions,
StreamTrimOptions,
ZAddOptions,
Expand Down Expand Up @@ -170,6 +171,7 @@ import {
createXGroupCreateConsumer,
createXGroupDelConsumer,
createXLen,
createXPending,
createXRead,
createXTrim,
createZAdd,
Expand Down Expand Up @@ -3925,6 +3927,77 @@ export class BaseClient {
return this.createWritePromise(createXLen(key));
}

/**
* Returns stream message summary information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @returns An `array` that includes the summary of the pending messages. See example for more details.
* @example
* ```typescript
* console.log(await client.xpending("my_stream", "my_group")); // Output:
* // [
* // 42, // The total number of pending messages
* // "1722643465939-0", // The smallest ID among the pending messages
* // "1722643484626-0", // The greatest ID among the pending messages
* // [ // A 2D-`array` of every consumer in the group
* // [ "consumer1", "10" ], // with at least one pending message, and the
* // [ "consumer2", "32" ], // number of pending messages it has
* // ]
* // ]
* ```
*/
public async xpending(
key: string,
group: string,
): Promise<[number, string, string, [string, number][]]> {
return this.createWritePromise(createXPending(key, group));
}

/**
* Returns an extended form of stream message information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param options - Additional options to filter entries, see {@link StreamPendingOptions}.
* @returns A 2D-`array` of 4-tuples containing extended message information. See example for more details.
*
* @example
* ```typescript
* console.log(await client.xpending("my_stream", "my_group"), {
* start: { value: "0-1", isInclusive: true },
* end: InfScoreBoundary.PositiveInfinity,
* count: 2,
* consumer: "consumer1"
* }); // Output:
* // [
* // [
* // "1722643465939-0", // The ID of the message
* // "consumer1", // The name of the consumer that fetched the message and has still to acknowledge it
* // 174431, // The number of milliseconds that elapsed since the last time this message was delivered to this consumer
* // 1 // The number of times this message was delivered
* // ],
* // [
* // "1722643484626-0",
* // "consumer1",
* // 202231,
* // 1
* // ]
* // ]
* ```
*/
public async xpendingWithOptions(
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
key: string,
group: string,
options: StreamPendingOptions,
): Promise<[string, string, number, number][]> {
return this.createWritePromise(createXPending(key, group, options));
}

/**
* Returns the list of all consumers and their attributes for the given consumer group of the
* stream stored at `key`.
Expand Down
138 changes: 98 additions & 40 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1628,37 +1628,52 @@ type SortedSetRange<T> = {
export type RangeByScore = SortedSetRange<number> & { type: "byScore" };
export type RangeByLex = SortedSetRange<string> & { type: "byLex" };

/**
* Returns a string representation of a score boundary in Redis protocol format.
* @param score - The score boundary object containing value and inclusivity
* information.
* @param isLex - Indicates whether to return lexical representation for
* positive/negative infinity.
* @returns A string representation of the score boundary in Redis protocol
* format.
*/
/** Returns a string representation of a score boundary as a command argument. */
function getScoreBoundaryArg(
score: ScoreBoundary<number> | ScoreBoundary<string>,
isLex: boolean = false,
): string {
if (score == InfScoreBoundary.PositiveInfinity) {
return (
InfScoreBoundary.PositiveInfinity.toString() + (isLex ? "" : "inf")
);
if (typeof score === "string") {
// InfScoreBoundary
return score + "inf";
}

if (score == InfScoreBoundary.NegativeInfinity) {
return (
InfScoreBoundary.NegativeInfinity.toString() + (isLex ? "" : "inf")
);
if (score.isInclusive == false) {
return "(" + score.value.toString();
}

return score.value.toString();
}

/** Returns a string representation of a lex boundary as a command argument. */
function getLexBoundaryArg(
score: ScoreBoundary<number> | ScoreBoundary<string>,
): string {
if (typeof score === "string") {
// InfScoreBoundary
return score;
}

if (score.isInclusive == false) {
return "(" + score.value.toString();
}

return "[" + score.value.toString();
}

/** Returns a string representation of a stream boundary as a command argument. */
function getStreamBoundaryArg(
score: ScoreBoundary<number> | ScoreBoundary<string>,
): string {
if (typeof score === "string") {
// InfScoreBoundary
return score;
}

if (score.isInclusive == false) {
return "(" + score.value.toString();
}

const value = isLex ? "[" + score.value.toString() : score.value.toString();
return value;
return score.value.toString();
}

function createZRangeArgs(
Expand All @@ -1671,10 +1686,20 @@ function createZRangeArgs(

if (typeof rangeQuery.start != "number") {
rangeQuery = rangeQuery as RangeByScore | RangeByLex;
const isLex = rangeQuery.type == "byLex";
args.push(getScoreBoundaryArg(rangeQuery.start, isLex));
args.push(getScoreBoundaryArg(rangeQuery.stop, isLex));
args.push(isLex == true ? "BYLEX" : "BYSCORE");

if (rangeQuery.type == "byLex") {
args.push(
getLexBoundaryArg(rangeQuery.start),
getLexBoundaryArg(rangeQuery.stop),
"BYLEX",
);
} else {
args.push(
getScoreBoundaryArg(rangeQuery.start),
getScoreBoundaryArg(rangeQuery.stop),
"BYSCORE",
);
}
} else {
args.push(rangeQuery.start.toString());
args.push(rangeQuery.stop.toString());
Expand Down Expand Up @@ -1707,9 +1732,11 @@ export function createZCount(
minScore: ScoreBoundary<number>,
maxScore: ScoreBoundary<number>,
): command_request.Command {
const args = [key];
args.push(getScoreBoundaryArg(minScore));
args.push(getScoreBoundaryArg(maxScore));
const args = [
key,
getScoreBoundaryArg(minScore),
getScoreBoundaryArg(maxScore),
];
return createCommand(RequestType.ZCount, args);
}

Expand Down Expand Up @@ -1862,11 +1889,7 @@ export function createZRemRangeByLex(
minLex: ScoreBoundary<string>,
maxLex: ScoreBoundary<string>,
): command_request.Command {
const args = [
key,
getScoreBoundaryArg(minLex, true),
getScoreBoundaryArg(maxLex, true),
];
const args = [key, getLexBoundaryArg(minLex), getLexBoundaryArg(maxLex)];
return createCommand(RequestType.ZRemRangeByLex, args);
}

Expand All @@ -1878,12 +1901,15 @@ export function createZRemRangeByScore(
minScore: ScoreBoundary<number>,
maxScore: ScoreBoundary<number>,
): command_request.Command {
const args = [key];
args.push(getScoreBoundaryArg(minScore));
args.push(getScoreBoundaryArg(maxScore));
const args = [
key,
getScoreBoundaryArg(minScore),
getScoreBoundaryArg(maxScore),
];
return createCommand(RequestType.ZRemRangeByScore, args);
}

/** @internal */
export function createPersist(key: string): command_request.Command {
return createCommand(RequestType.Persist, [key]);
}
Expand All @@ -1896,11 +1922,7 @@ export function createZLexCount(
minLex: ScoreBoundary<string>,
maxLex: ScoreBoundary<string>,
): command_request.Command {
const args = [
key,
getScoreBoundaryArg(minLex, true),
getScoreBoundaryArg(maxLex, true),
];
const args = [key, getLexBoundaryArg(minLex), getLexBoundaryArg(maxLex)];
return createCommand(RequestType.ZLexCount, args);
}

Expand Down Expand Up @@ -2417,6 +2439,42 @@ export function createXLen(key: string): command_request.Command {
return createCommand(RequestType.XLen, [key]);
}

/** Optional arguments for {@link BaseClient.xpendingWithOptions|xpending}. */
export type StreamPendingOptions = {
/** Filter pending entries by their idle time - in milliseconds */
minIdleTime?: number;
Yury-Fridlyand marked this conversation as resolved.
Show resolved Hide resolved
/** Starting stream ID bound for range. */
start: ScoreBoundary<string>;
/** Ending stream ID bound for range. */
end: ScoreBoundary<string>;
/** Limit the number of messages returned. */
count: number;
/** Filter pending entries by consumer. */
consumer?: string;
};

/** @internal */
export function createXPending(
key: string,
group: string,
options?: StreamPendingOptions,
): command_request.Command {
const args = [key, group];

if (options) {
if (options.minIdleTime !== undefined)
args.push("IDLE", options.minIdleTime.toString());
args.push(
getStreamBoundaryArg(options.start),
getStreamBoundaryArg(options.end),
options.count.toString(),
);
if (options.consumer) args.push(options.consumer);
}

return createCommand(RequestType.XPending, args);
}

/** @internal */
export function createXInfoConsumers(
key: string,
Expand Down
38 changes: 38 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import {
StreamAddOptions,
StreamClaimOptions,
StreamGroupOptions,
StreamPendingOptions,
StreamReadOptions,
StreamTrimOptions,
ZAddOptions,
Expand Down Expand Up @@ -205,6 +206,7 @@ import {
createXInfoConsumers,
createXInfoStream,
createXLen,
createXPending,
createXRead,
createXTrim,
createZAdd,
Expand Down Expand Up @@ -2312,6 +2314,9 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
}

/**
* Returns stream message summary information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
* Returns the list of all consumers and their attributes for the given consumer group of the
* stream stored at `key`.
*
Expand All @@ -2320,6 +2325,39 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
* @param key - The key of the stream.
* @param group - The consumer group name.
*
* Command Response - An `array` that includes the summary of the pending messages.
* See example of {@link BaseClient.xpending|xpending} for more details.
*/
public xpending(key: string, group: string): T {
return this.addAndReturn(createXPending(key, group));
}

/**
* Returns stream message summary information for pending messages matching a given range of IDs.
*
* See https://valkey.io/commands/xpending/ for more details.
*
* @param key - The key of the stream.
* @param group - The consumer group name.
* @param options - Additional options to filter entries, see {@link StreamPendingOptions}.
*
* Command Response - A 2D-`array` of 4-tuples containing extended message information.
* See example of {@link BaseClient.xpendingWithOptions|xpendingWithOptions} for more details.
*/
public xpendingWithOptions(
key: string,
group: string,
options: StreamPendingOptions,
): T {
return this.addAndReturn(createXPending(key, group, options));
}

/**
* Returns the list of all consumers and their attributes for the given consumer group of the
* stream stored at `key`.
*
* See https://valkey.io/commands/xinfo-consumers/ for more details.
*
* Command Response - An `Array` of `Records`, where each mapping contains the attributes
* of a consumer for the given consumer group of the stream at `key`.
*/
Expand Down
Loading
Loading