From 165dcbceb5a96a8c9dd86300c8a6e99af27d5fba Mon Sep 17 00:00:00 2001 From: Aaron <69273634+aaron-congo@users.noreply.github.com> Date: Wed, 12 Jun 2024 15:21:52 -0700 Subject: [PATCH] Node: add XLEN command (#1555) --- CHANGELOG.md | 1 + node/src/BaseClient.ts | 19 +++++++++++++++ node/src/Commands.ts | 7 ++++++ node/src/Transaction.ts | 14 +++++++++++ node/tests/SharedTests.ts | 47 +++++++++++++++++++++++++++++++------ node/tests/TestUtilities.ts | 2 ++ 6 files changed, 83 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 522baa4a9a..eee8c8f28f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * Node: Added PFCOUNT command ([#1545](https://github.com/aws/glide-for-redis/pull/1545)) * Node: Added OBJECT FREQ command ([#1542](https://github.com/aws/glide-for-redis/pull/1542)) * Node: Added LINSERT command ([#1544](https://github.com/aws/glide-for-redis/pull/1544)) +* Node: Added XLEN command ([#1555](https://github.com/aws/glide-for-redis/pull/1555)) ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/node/src/BaseClient.ts b/node/src/BaseClient.ts index a3b50f1c27..12e24eced5 100644 --- a/node/src/BaseClient.ts +++ b/node/src/BaseClient.ts @@ -99,6 +99,7 @@ import { createZRemRangeByScore, createZScore, createSUnionStore, + createXLen, } from "./Commands"; import { ClosingError, @@ -2271,6 +2272,24 @@ export class BaseClient { return this.createWritePromise(createXRead(keys_and_ids, options)); } + /** + * Returns the number of entries in the stream stored at `key`. + * + * See https://valkey.io/commands/xlen/ for more details. + * + * @param key - The key of the stream. + * @returns The number of entries in the stream. If `key` does not exist, returns `0`. + * + * @example + * ```typescript + * const numEntries = await client.xlen("my_stream"); + * console.log(numEntries); // Output: 2 - "my_stream" contains 2 entries. + * ``` + */ + public xlen(key: string): Promise { + return this.createWritePromise(createXLen(key)); + } + private readonly MAP_READ_FROM_STRATEGY: Record< ReadFrom, connection_request.ReadFrom diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 1c55f0df36..a8ecdbf8e0 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -1418,6 +1418,13 @@ export function createXRead( return createCommand(RequestType.XRead, args); } +/** + * @internal + */ +export function createXLen(key: string): redis_request.Command { + return createCommand(RequestType.XLen, [key]); +} + /** * @internal */ diff --git a/node/src/Transaction.ts b/node/src/Transaction.ts index 5041b36637..f2f6bb1e03 100644 --- a/node/src/Transaction.ts +++ b/node/src/Transaction.ts @@ -104,6 +104,7 @@ import { createZRemRangeByScore, createZScore, createSUnionStore, + createXLen, } from "./Commands"; import { redis_request } from "./ProtobufMessage"; @@ -1346,6 +1347,19 @@ export class BaseTransaction> { return this.addAndReturn(createXRead(keys_and_ids, options)); } + /** + * Returns the number of entries in the stream stored at `key`. + * + * See https://valkey.io/commands/xlen/ for more details. + * + * @param key - The key of the stream. + * + * Command Response - The number of entries in the stream. If `key` does not exist, returns `0`. + */ + public xlen(key: string): T { + return this.addAndReturn(createXLen(key)); + } + /** * Renames `key` to `newkey`. * If `newkey` already exists it is overwritten. diff --git a/node/tests/SharedTests.ts b/node/tests/SharedTests.ts index 01c2ffdb44..89ec2054b5 100644 --- a/node/tests/SharedTests.ts +++ b/node/tests/SharedTests.ts @@ -2467,10 +2467,12 @@ export function runBaseTests(config: { ); it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( - `streams add and trim test_%p`, - async () => { + `streams add, trim, and len test_%p`, + async (protocol) => { await runTest(async (client: BaseClient) => { const key = uuidv4(); + const nonExistingKey = uuidv4(); + const stringKey = uuidv4(); const field1 = uuidv4(); const field2 = uuidv4(); @@ -2501,7 +2503,7 @@ export function runBaseTests(config: { [field2, "bar2"], ]), ).not.toBeNull(); - expect(await client.customCommand(["XLEN", key])).toEqual(2); + expect(await client.xlen(key)).toEqual(2); // this will trim the first entry. const id = await client.xadd( @@ -2519,7 +2521,7 @@ export function runBaseTests(config: { }, ); expect(id).not.toBeNull(); - expect(await client.customCommand(["XLEN", key])).toEqual(2); + expect(await client.xlen(key)).toEqual(2); // this will trim the 2nd entry. expect( @@ -2538,7 +2540,7 @@ export function runBaseTests(config: { }, ), ).not.toBeNull(); - expect(await client.customCommand(["XLEN", key])).toEqual(2); + expect(await client.xlen(key)).toEqual(2); expect( await client.xtrim(key, { @@ -2547,8 +2549,39 @@ export function runBaseTests(config: { exact: true, }), ).toEqual(1); - expect(await client.customCommand(["XLEN", key])).toEqual(1); - }, ProtocolVersion.RESP2); + expect(await client.xlen(key)).toEqual(1); + + expect( + await client.xtrim(key, { + method: "maxlen", + threshold: 0, + exact: true, + }), + ).toEqual(1); + // Unlike other Redis collection types, stream keys still exist even after removing all entries + expect(await client.exists([key])).toEqual(1); + expect(await client.xlen(key)).toEqual(0); + + expect( + await client.xtrim(nonExistingKey, { + method: "maxlen", + threshold: 1, + exact: true, + }), + ).toEqual(0); + expect(await client.xlen(nonExistingKey)).toEqual(0); + + // key exists, but it is not a stream + expect(await client.set(stringKey, "foo")).toEqual("OK"); + await expect( + client.xtrim(stringKey, { + method: "maxlen", + threshold: 1, + exact: true, + }), + ).rejects.toThrow(); + await expect(client.xlen(stringKey)).rejects.toThrow(); + }, protocol); }, config.timeout, ); diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index c56f6965a6..b2f4cda3a2 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -387,6 +387,8 @@ export async function transactionTest( args.push("0-2"); baseTransaction.xadd(key9, [["field", "value3"]], { id: "0-3" }); args.push("0-3"); + baseTransaction.xlen(key9); + args.push(3); baseTransaction.xread({ [key9]: "0-1" }); args.push({ [key9]: {