Skip to content

Commit

Permalink
Node: Update tests for blocking commands. (valkey-io#2127)
Browse files Browse the repository at this point in the history
* Update tests for blocking commands.

Signed-off-by: Yury-Fridlyand <yury.fridlyand@improving.com>
  • Loading branch information
Yury-Fridlyand authored Aug 22, 2024
1 parent 20f8868 commit 99f9e88
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 43 deletions.
4 changes: 2 additions & 2 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2497,8 +2497,8 @@ export enum FlushMode {
export type StreamReadOptions = {
/**
* If set, the read request will block for the set amount of milliseconds or
* until the server has the required number of entries. Equivalent to `BLOCK`
* in the Redis API.
* until the server has the required number of entries. A value of `0` will block indefinitely.
* Equivalent to `BLOCK` in the Redis API.
*/
block?: number;
/**
Expand Down
41 changes: 0 additions & 41 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { v4 as uuidv4 } from "uuid";
import {
Decoder,
GlideClient,
ListDirection,
ProtocolVersion,
RequestError,
Transaction,
Expand Down Expand Up @@ -130,46 +129,6 @@ describe("GlideClient", () => {
},
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"check that blocking commands returns never timeout_%p",
async (protocol) => {
client = await GlideClient.createClient(
getClientConfigurationOption(cluster.getAddresses(), protocol, {
requestTimeout: 300,
}),
);

const promiseList = [
client.blmove(
"source",
"destination",
ListDirection.LEFT,
ListDirection.LEFT,
0.1,
),
client.blmpop(["key1", "key2"], ListDirection.LEFT, 0.1),
client.bzpopmax(["key1", "key2"], 0),
client.bzpopmin(["key1", "key2"], 0),
];

try {
for (const promise of promiseList) {
const timeoutPromise = new Promise((resolve) => {
setTimeout(resolve, 500);
});
await Promise.race([promise, timeoutPromise]);
}
} finally {
for (const promise of promiseList) {
await Promise.resolve([promise]);
}

client.close();
}
},
5000,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"select dbsize flushdb test %p",
async (protocol) => {
Expand Down
77 changes: 77 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
ListDirection,
ProtocolVersion,
RequestError,
ReturnType,
ScoreFilter,
Script,
SignedEncoding,
Expand Down Expand Up @@ -10476,6 +10477,82 @@ export function runBaseTests(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
"check that blocking commands never time out %p",
async (protocol) => {
await runTest(async (client: BaseClient, cluster) => {
const key1 = "{blocking}-1-" + uuidv4();
const key2 = "{blocking}-2-" + uuidv4();
const key3 = "{blocking}-3-" + uuidv4(); // stream
const keyz = [key1, key2];

// create a group and a stream, so `xreadgroup` won't fail on missing group
await client.xgroupCreate(key3, "group", "0", {
mkStream: true,
});

const promiseList: [string, Promise<ReturnType>][] = [
["bzpopmax", client.bzpopmax(keyz, 0)],
["bzpopmin", client.bzpopmin(keyz, 0)],
["blpop", client.blpop(keyz, 0)],
["brpop", client.brpop(keyz, 0)],
["xread", client.xread({ [key3]: "0-0" }, { block: 0 })],
[
"xreadgroup",
client.xreadgroup(
"group",
"consumer",
{ [key3]: "0-0" },
{ block: 0 },
),
],
["wait", client.wait(42, 0)],
];

if (!cluster.checkIfServerVersionLessThan("6.2.0")) {
promiseList.push([
"blmove",
client.blmove(
key1,
key2,
ListDirection.LEFT,
ListDirection.LEFT,
0,
),
]);
}

if (!cluster.checkIfServerVersionLessThan("7.0.0")) {
promiseList.push(
["blmpop", client.blmpop(keyz, ListDirection.LEFT, 0)],
["bzmpop", client.bzmpop(keyz, ScoreFilter.MAX, 0)],
);
}

try {
for (const [name, promise] of promiseList) {
const timeoutPromise = new Promise((resolve) => {
setTimeout(resolve, 500, "timeOutPromiseWins");
});
// client has default request timeout 250 ms, we run all commands with infinite blocking
// we expect that all commands will still await for the response even after 500 ms
expect(
await Promise.race([
promise.finally(() =>
fail(`${name} didn't block infintely`),
),
timeoutPromise,
]),
).toEqual("timeOutPromiseWins");
}
} finally {
client.close();
}
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`getex test_%p`,
async (protocol) => {
Expand Down

0 comments on commit 99f9e88

Please sign in to comment.