Skip to content

Commit

Permalink
Node: add XLEN command (#1555)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-congo authored Jun 12, 2024
1 parent bd233de commit 2c8b8ff
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* Node: Added PFCOUNT command ([#1545](https://github.com/aws/glide-for-redis/pull/1545))
* Node: Added OBJECT FREQ command ([#1542](https://github.com/aws/glide-for-redis/pull/1542))
* Node: Added LINSERT command ([#1544](https://github.com/aws/glide-for-redis/pull/1544))
* Node: Added XLEN command ([#1555](https://github.com/aws/glide-for-redis/pull/1555))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
19 changes: 19 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ import {
createZRemRangeByScore,
createZScore,
createSUnionStore,
createXLen,
} from "./Commands";
import {
ClosingError,
Expand Down Expand Up @@ -2271,6 +2272,24 @@ export class BaseClient {
return this.createWritePromise(createXRead(keys_and_ids, options));
}

/**
* Returns the number of entries in the stream stored at `key`.
*
* See https://valkey.io/commands/xlen/ for more details.
*
* @param key - The key of the stream.
* @returns The number of entries in the stream. If `key` does not exist, returns `0`.
*
* @example
* ```typescript
* const numEntries = await client.xlen("my_stream");
* console.log(numEntries); // Output: 2 - "my_stream" contains 2 entries.
* ```
*/
public xlen(key: string): Promise<number> {
return this.createWritePromise(createXLen(key));
}

private readonly MAP_READ_FROM_STRATEGY: Record<
ReadFrom,
connection_request.ReadFrom
Expand Down
7 changes: 7 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,13 @@ export function createXRead(
return createCommand(RequestType.XRead, args);
}

/**
* @internal
*/
export function createXLen(key: string): redis_request.Command {
return createCommand(RequestType.XLen, [key]);
}

/**
* @internal
*/
Expand Down
14 changes: 14 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ import {
createZRemRangeByScore,
createZScore,
createSUnionStore,
createXLen,
} from "./Commands";
import { redis_request } from "./ProtobufMessage";

Expand Down Expand Up @@ -1346,6 +1347,19 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXRead(keys_and_ids, options));
}

/**
* Returns the number of entries in the stream stored at `key`.
*
* See https://valkey.io/commands/xlen/ for more details.
*
* @param key - The key of the stream.
*
* Command Response - The number of entries in the stream. If `key` does not exist, returns `0`.
*/
public xlen(key: string): T {
return this.addAndReturn(createXLen(key));
}

/**
* Renames `key` to `newkey`.
* If `newkey` already exists it is overwritten.
Expand Down
47 changes: 40 additions & 7 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2467,10 +2467,12 @@ export function runBaseTests<Context>(config: {
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`streams add and trim test_%p`,
async () => {
`streams add, trim, and len test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient) => {
const key = uuidv4();
const nonExistingKey = uuidv4();
const stringKey = uuidv4();
const field1 = uuidv4();
const field2 = uuidv4();

Expand Down Expand Up @@ -2501,7 +2503,7 @@ export function runBaseTests<Context>(config: {
[field2, "bar2"],
]),
).not.toBeNull();
expect(await client.customCommand(["XLEN", key])).toEqual(2);
expect(await client.xlen(key)).toEqual(2);

// this will trim the first entry.
const id = await client.xadd(
Expand All @@ -2519,7 +2521,7 @@ export function runBaseTests<Context>(config: {
},
);
expect(id).not.toBeNull();
expect(await client.customCommand(["XLEN", key])).toEqual(2);
expect(await client.xlen(key)).toEqual(2);

// this will trim the 2nd entry.
expect(
Expand All @@ -2538,7 +2540,7 @@ export function runBaseTests<Context>(config: {
},
),
).not.toBeNull();
expect(await client.customCommand(["XLEN", key])).toEqual(2);
expect(await client.xlen(key)).toEqual(2);

expect(
await client.xtrim(key, {
Expand All @@ -2547,8 +2549,39 @@ export function runBaseTests<Context>(config: {
exact: true,
}),
).toEqual(1);
expect(await client.customCommand(["XLEN", key])).toEqual(1);
}, ProtocolVersion.RESP2);
expect(await client.xlen(key)).toEqual(1);

expect(
await client.xtrim(key, {
method: "maxlen",
threshold: 0,
exact: true,
}),
).toEqual(1);
// Unlike other Redis collection types, stream keys still exist even after removing all entries
expect(await client.exists([key])).toEqual(1);
expect(await client.xlen(key)).toEqual(0);

expect(
await client.xtrim(nonExistingKey, {
method: "maxlen",
threshold: 1,
exact: true,
}),
).toEqual(0);
expect(await client.xlen(nonExistingKey)).toEqual(0);

// key exists, but it is not a stream
expect(await client.set(stringKey, "foo")).toEqual("OK");
await expect(
client.xtrim(stringKey, {
method: "maxlen",
threshold: 1,
exact: true,
}),
).rejects.toThrow();
await expect(client.xlen(stringKey)).rejects.toThrow();
}, protocol);
},
config.timeout,
);
Expand Down
2 changes: 2 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ export async function transactionTest(
args.push("0-2");
baseTransaction.xadd(key9, [["field", "value3"]], { id: "0-3" });
args.push("0-3");
baseTransaction.xlen(key9);
args.push(3);
baseTransaction.xread({ [key9]: "0-1" });
args.push({
[key9]: {
Expand Down

0 comments on commit 2c8b8ff

Please sign in to comment.