Skip to content

Commit

Permalink
Node: Add support for XADD, XTRIM commands. (valkey-io#1057)
Browse files Browse the repository at this point in the history
Co-authored-by: nihohit <nihohit@gmail.com>
  • Loading branch information
shachlanAmazon and nihohit authored Mar 10, 2024
1 parent 1f6ed5f commit 705ee41
Show file tree
Hide file tree
Showing 7 changed files with 415 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Python, Node: Added PTTL command ([#1036](https://github.com/aws/glide-for-redis/pull/1036), [#1082](https://github.com/aws/glide-for-redis/pull/1082))
* Node: Added HVAL command ([#1022](https://github.com/aws/glide-for-redis/pull/1022))
* Node: Added PERSIST command ([#1023](https://github.com/aws/glide-for-redis/pull/1023))
* Node: Added Xadd, Xtrim commands. ([#1057](https://github.com/aws/glide-for-redis/pull/1057))

#### Features

Expand Down
32 changes: 32 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import {
ExpireOptions,
ScoreLimit,
SetOptions,
StreamAddOptions,
StreamTrimOptions,
ZaddOptions,
createDecr,
createDecrBy,
Expand Down Expand Up @@ -61,6 +63,8 @@ import {
createTTL,
createType,
createUnlink,
createXadd,
createXtrim,
createZadd,
createZcard,
createZcount,
Expand Down Expand Up @@ -1207,6 +1211,34 @@ export class BaseClient {
return this.createWritePromise(createZremRangeByRank(key, start, end));
}

/**
* Adds an entry to the specified stream.
* See https://redis.io/commands/xadd/ for more details.
*
* @param key - The key of the stream.
* @param values - field-value pairs to be added to the entry.
* @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists.
*/
public xadd(
key: string,
values: [string, string][],
options?: StreamAddOptions,
): Promise<string | null> {
return this.createWritePromise(createXadd(key, values, options));
}

/**
* Trims the stream by evicting older entries.
* See https://redis.io/commands/xtrim/ for more details.
*
* @param key - the key of the stream
* @param options - options detailing how to trim the stream.
* @returns The number of entries deleted from the stream.
*/
public xtrim(key: string, options: StreamTrimOptions): Promise<number> {
return this.createWritePromise(createXtrim(key, options));
}

private readonly MAP_READ_FROM_STRATEGY: Record<
ReadFrom,
connection_request.ReadFrom
Expand Down
111 changes: 109 additions & 2 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -916,9 +916,116 @@ export function createZremRangeByRank(
]);
}

export function createPersist(key: string): redis_request.Command {
return createCommand(RequestType.Persist, [key]);
}

export type StreamTrimOptions = (
| {
/**
* Trim the stream according to entry ID.
* Equivalent to `MINID` in the Redis API.
*/
method: "minid";
threshold: string;
}
| {
/**
* Trim the stream according to length.
* Equivalent to `MAXLEN` in the Redis API.
*/
method: "maxlen";
threshold: number;
}
) & {
/**
* If `true`, the stream will be trimmed exactly. Equivalent to `=` in the Redis API. Otherwise the stream will be trimmed in a near-exact manner, which is more efficient, equivalent to `~` in the Redis API.
*/
exact: boolean;
/**
* If set, sets the maximal amount of entries that will be deleted.
*/
limit?: number;
};

export type StreamAddOptions = {
/**
* If set, the new entry will be added with this ID.
*/
id?: string;
/**
* If set to `false`, a new stream won't be created if no stream matches the given key.
* Equivalent to `NOMKSTREAM` in the Redis API.
*/
makeStream?: boolean;
/**
* If set, the add operation will also trim the older entries in the stream.
*/
trim?: StreamTrimOptions;
};

function addTrimOptions(options: StreamTrimOptions, args: string[]) {
if (options.method === "maxlen") {
args.push("MAXLEN");
} else if (options.method === "minid") {
args.push("MINID");
}

if (options.exact) {
args.push("=");
} else {
args.push("~");
}

if (options.method === "maxlen") {
args.push(options.threshold.toString());
} else if (options.method === "minid") {
args.push(options.threshold);
}

if (options.limit) {
args.push("LIMIT");
args.push(options.limit.toString());
}
}

export function createXadd(
key: string,
values: [string, string][],
options?: StreamAddOptions,
): redis_request.Command {
const args = [key];

if (options?.makeStream === false) {
args.push("NOMKSTREAM");
}

if (options?.trim) {
addTrimOptions(options.trim, args);
}

if (options?.id) {
args.push(options.id);
} else {
args.push("*");
}

values.forEach(([field, value]) => {
args.push(field);
args.push(value);
});

return createCommand(RequestType.XAdd, args);
}

/**
* @internal
*/
export function createPersist(key: string): redis_request.Command {
return createCommand(RequestType.Persist, [key]);
export function createXtrim(
key: string,
options: StreamTrimOptions,
): redis_request.Command {
const args = [key];
addTrimOptions(options, args);
return createCommand(RequestType.XTrim, args);
}
32 changes: 32 additions & 0 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
InfoOptions,
ScoreLimit,
SetOptions,
StreamAddOptions,
StreamTrimOptions,
ZaddOptions,
createClientGetName,
createClientId,
Expand Down Expand Up @@ -64,6 +66,8 @@ import {
createTTL,
createType,
createUnlink,
createXadd,
createXtrim,
createZadd,
createZcard,
createZcount,
Expand Down Expand Up @@ -1011,6 +1015,34 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
public lindex(key: string, index: number): T {
return this.addAndReturn(createLindex(key, index));
}

/**
* Adds an entry to the specified stream.
* See https://redis.io/commands/xadd/ for more details.
*
* @param key - The key of the stream.
* @param values - field-value pairs to be added to the entry.
* @returns The id of the added entry, or `null` if `options.makeStream` is set to `false` and no stream with the matching `key` exists.
*/
public xadd(
key: string,
values: [string, string][],
options?: StreamAddOptions,
): T {
return this.addAndReturn(createXadd(key, values, options));
}

/**
* Trims the stream by evicting older entries.
* See https://redis.io/commands/xtrim/ for more details.
*
* @param key - the key of the stream
* @param options - options detailing how to trim the stream.
* @returns The number of entries deleted from the stream.
*/
public xtrim(key: string, options: StreamTrimOptions): T {
return this.addAndReturn(createXtrim(key, options));
}
}

/**
Expand Down
143 changes: 143 additions & 0 deletions node/tests/RedisClientInternals.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -627,4 +627,147 @@ describe("SocketConnectionInternals", () => {
expect(result2).toBeNull();
});
});

it("should set arguments according to xadd options", async () => {
let counter = 0;
await testWithClusterResources(async (connection, socket) => {
socket.on("data", (data) => {
const reader = Reader.create(data);
const request =
redis_request.RedisRequest.decodeDelimited(reader);
expect(request.singleCommand?.requestType).toEqual(
RequestType.XAdd,
);

if (counter === 0) {
expect(request.singleCommand?.argsArray?.args).toEqual([
"foo",
"*",
"a",
"1",
"b",
"2",
]);
} else if (counter === 1) {
expect(request.singleCommand?.argsArray?.args).toEqual([
"bar",
"YOLO",
"a",
"1",
]);
} else if (counter === 2) {
expect(request.singleCommand?.argsArray?.args).toEqual([
"baz",
"MAXLEN",
"=",
"1000",
"*",
"c",
"3",
]);
} else if (counter === 3) {
expect(request.singleCommand?.argsArray?.args).toEqual([
"foobar",
"NOMKSTREAM",
"MINID",
"~",
"foo",
"LIMIT",
"1000",
"*",
"d",
"4",
]);
} else {
throw new Error("too many requests! " + counter);
}

counter = counter + 1;

sendResponse(socket, ResponseType.Null, request.callbackIdx);
});
await connection.xadd("foo", [
["a", "1"],
["b", "2"],
]);

await connection.xadd("bar", [["a", "1"]], {
id: "YOLO",
makeStream: true,
});

await connection.xadd("baz", [["c", "3"]], {
trim: {
method: "maxlen",
threshold: 1000,
exact: true,
},
});

await connection.xadd("foobar", [["d", "4"]], {
makeStream: false,
trim: {
method: "minid",
threshold: "foo",
exact: false,
limit: 1000,
},
});

expect(counter).toEqual(4);
});
});

it("should set arguments according to xtrim options", async () => {
let counter = 0;
await testWithClusterResources(async (connection, socket) => {
socket.on("data", (data) => {
const reader = Reader.create(data);
const request =
redis_request.RedisRequest.decodeDelimited(reader);
expect(request.singleCommand?.requestType).toEqual(
RequestType.XTrim,
);

if (counter === 0) {
expect(request.singleCommand?.argsArray?.args).toEqual([
"foo",
"MAXLEN",
"=",
"1000",
]);
} else if (counter === 1) {
expect(request.singleCommand?.argsArray?.args).toEqual([
"bar",
"MINID",
"~",
"foo",
"LIMIT",
"1000",
]);
} else {
throw new Error("too many requests! " + counter);
}

counter = counter + 1;

sendResponse(socket, ResponseType.Null, request.callbackIdx);
});

await connection.xtrim("foo", {
method: "maxlen",
threshold: 1000,
exact: true,
});

await connection.xtrim("bar", {
method: "minid",
threshold: "foo",
exact: false,
limit: 1000,
});

expect(counter).toEqual(2);
});
});
});
Loading

0 comments on commit 705ee41

Please sign in to comment.