Skip to content

Commit

Permalink
Add xinfo stream to transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Carbonetto <andrew.carbonetto@improving.com>
  • Loading branch information
acarbonetto committed Aug 2, 2024
1 parent 64c26b0 commit 5b8a668
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
20 changes: 18 additions & 2 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ import {
StreamReadOptions,
StreamTrimOptions,
ZAddOptions,
createBLMPop,
createBLMove,
createBLPop,
createBLMPop,
createBRPop,
createBZMPop,
createBitCount,
Expand Down Expand Up @@ -113,8 +113,8 @@ import {
createLIndex,
createLInsert,
createLLen,
createLMove,
createLMPop,
createLMove,
createLPop,
createLPos,
createLPush,
Expand Down Expand Up @@ -177,6 +177,7 @@ import {
createUnlink,
createXAdd,
createXDel,
createXInfoStream,
createXLen,
createXRead,
createXTrim,
Expand Down Expand Up @@ -2060,6 +2061,21 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createXTrim(key, options));
}

/**
* Returns information about the stream stored at `key`.
*
* @param key - The key of the stream.
* @param fullOptions - If `true`, returns verbose information with a limit of the first 10 PEL entries.
* If `number` is specified, returns verbose information limiting the returned PEL entries.
* If `0` is specified, returns verbose information with no limit.
*
* Command Response - A map of detailed stream information for the given `key`. See
* the example for a sample response.
*/
public xinfoStream(key: string, fullOptions?: boolean | number): T {
return this.addAndReturn(createXInfoStream(key, fullOptions ?? false));
}

/** Returns the server time.
* See https://valkey.io/commands/time/ for details.
*
Expand Down
55 changes: 55 additions & 0 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,61 @@ describe("GlideClient", () => {
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"xinfo stream transaction test_%p",
async (protocol) => {
const client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol),
);

const key = uuidv4();

const transaction = new Transaction();
transaction.xadd(key, [["field1", "value1"]], { id: "0-1" });
transaction.xinfoStream(key);
transaction.xinfoStream(key, true);
const result = await client.exec(transaction);
expect(result).not.toBeNull();

const versionLessThan7 =
cluster.checkIfServerVersionLessThan("7.0.0");

const expectedXinfoStreamResult = {
length: 1,
"radix-tree-keys": 1,
"radix-tree-nodes": 2,
"last-generated-id": "0-1",
groups: 0,
"first-entry": ["0-1", ["field1", "value1"]],
"last-entry": ["0-1", ["field1", "value1"]],
"max-deleted-entry-id": versionLessThan7 ? undefined : "0-0",
"entries-added": versionLessThan7 ? undefined : 1,
"recorded-first-entry-id": versionLessThan7 ? undefined : "0-1",
};

const expectedXinfoStreamFullResult = {
length: 1,
"radix-tree-keys": 1,
"radix-tree-nodes": 2,
"last-generated-id": "0-1",
entries: [["0-1", ["field1", "value1"]]],
groups: [],
"max-deleted-entry-id": versionLessThan7 ? undefined : "0-0",
"entries-added": versionLessThan7 ? undefined : 1,
"recorded-first-entry-id": versionLessThan7 ? undefined : "0-1",
};

if (result != null) {
expect(result[0]).toEqual("0-1"); // xadd
expect(result[1]).toEqual(expectedXinfoStreamResult);
expect(result[2]).toEqual(expectedXinfoStreamFullResult);
}

client.close();
},
TIMEOUT,
);

runBaseTests<Context>({
init: async (protocol, clientName?) => {
const options = getClientConfigurationOption(
Expand Down

0 comments on commit 5b8a668

Please sign in to comment.