Skip to content

Commit

Permalink
test: add tests for GlideClusterClient scan with allowNonCoveredSlots…
Browse files Browse the repository at this point in the history
… option

Signed-off-by: avifenesh <aviarchi1994@gmail.com>
  • Loading branch information
avifenesh committed Dec 15, 2024
1 parent fb2c42f commit 9c6b040
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 5 deletions.
2 changes: 1 addition & 1 deletion node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ export namespace GlideClusterClientConfiguration {
/**
* Configuration options for creating a {@link GlideClusterClient | GlideClusterClient}.
*
* Extends `BaseClientConfiguration` with properties specific to `GlideClusterClient`, such as periodic topology checks
* Extends {@link BaseClientConfiguration | BaseClientConfiguration} with properties specific to `GlideClusterClient`, such as periodic topology checks
* and Pub/Sub subscription settings.
*
* @remarks
Expand Down
162 changes: 162 additions & 0 deletions node/tests/ScanTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import {
GlideString,
ObjectType,
ProtocolVersion,
GlideClusterClientConfiguration,
} from "..";
import { ValkeyCluster } from "../../utils/TestUtils.js";
import {
flushAndCloseClient,
getClientConfigurationOption,
getServerVersion,
parseEndpoints,
waitForClusterReady as isClusterReadyWithExpectedNodeCount,
} from "./TestUtilities";

const TIMEOUT = 50000;
Expand Down Expand Up @@ -376,6 +378,166 @@ describe("Scan GlideClusterClient", () => {
},
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient scan with allowNonCoveredSlots %p`,
async (protocol) => {
const testCluster = await ValkeyCluster.createCluster(
true,
3,
0,
getServerVersion,
);
const config: GlideClusterClientConfiguration = {
addresses: testCluster
.getAddresses()
.map(([host, port]) => ({ host, port })),
protocol,
};
const testClient = await GlideClusterClient.createClient(config);

try {
for (let i = 0; i < 10000; i++) {
const result = await testClient.set(`${uuidv4()}`, "value");
expect(result).toBe("OK");
}

// Perform an initial scan to ensure all works as expected
let cursor = new ClusterScanCursor();
let result = await testClient.scan(cursor);
cursor = result[0];
expect(cursor.isFinished()).toBe(false);

// Set 'cluster-require-full-coverage' to 'no' to allow operations with missing slots
await testClient.configSet({
"cluster-require-full-coverage": "no",
});

// Forget one server to simulate a node failure
const addresses = testCluster.getAddresses();
const addressToForget = addresses[0];
const allOtherAddresses = addresses.slice(1);
const idToForget = await testClient.customCommand(
["CLUSTER", "MYID"],
{
route: {
type: "routeByAddress",
host: addressToForget[0],
port: addressToForget[1],
},
},
);

for (const address of allOtherAddresses) {
await testClient.customCommand(
["CLUSTER", "FORGET", idToForget as string],
{
route: {
type: "routeByAddress",
host: address[0],
port: address[1],
},
},
);
}

// Wait for the cluster to stabilize after forgetting a node
const ready = await isClusterReadyWithExpectedNodeCount(
testClient,
allOtherAddresses.length,
);
expect(ready).toBe(true);

// Attempt to scan without 'allowNonCoveredSlots', expecting an error
// Since it might take time for the inner core to forget the missing node,
// we retry the scan until the expected error is thrown.

const maxRetries = 10;
let retries = 0;
let errorReceived = false;

while (retries < maxRetries && !errorReceived) {
retries++;
cursor = new ClusterScanCursor();

try {
while (!cursor.isFinished()) {
result = await testClient.scan(cursor);
cursor = result[0];
}

// If scan completes without error, wait and retry
await new Promise((resolve) =>
setTimeout(resolve, 1000),
);
} catch (error) {
if (
error instanceof Error &&
error.message.includes(
"Could not find an address covering a slot, SCAN operation cannot continue",
)
) {
// Expected error occurred
errorReceived = true;
} else {
// Unexpected error, rethrow
throw error;
}
}
}

expect(errorReceived).toBe(true);

// Perform scan with 'allowNonCoveredSlots: true'
cursor = new ClusterScanCursor();

while (!cursor.isFinished()) {
result = await testClient.scan(cursor, {
allowNonCoveredSlots: true,
});
cursor = result[0];
}

expect(cursor.isFinished()).toBe(true);

// Get keys using 'KEYS *' from the remaining nodes
const keys: GlideString[] = [];

for (const address of allOtherAddresses) {
const result = await testClient.customCommand(
["KEYS", "*"],
{
route: {
type: "routeByAddress",
host: address[0],
port: address[1],
},
},
);
keys.push(...(result as GlideString[]));
}

// Scan again with 'allowNonCoveredSlots: true' and collect results
cursor = new ClusterScanCursor();
const results: GlideString[] = [];

while (!cursor.isFinished()) {
result = await testClient.scan(cursor, {
allowNonCoveredSlots: true,
});
results.push(...result[1]);
cursor = result[0];
}

// Compare the sets of keys obtained from 'KEYS *' and 'SCAN'
expect(new Set(results)).toEqual(new Set(keys));
} finally {
testClient.close();
await testCluster.close();
}
},
TIMEOUT,
);
});

//standalone tests
Expand Down
42 changes: 42 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,48 @@ function intoArrayInternal(obj: any, builder: string[]) {
}
}

export async function waitForClusterReady(
client: GlideClusterClient,
count: number,
): Promise<boolean> {
const timeout = 20000; // 20 seconds timeout in milliseconds
const startTime = Date.now();

while (true) {
if (Date.now() - startTime > timeout) {
return false;
}

const clusterInfo = await client.customCommand(["CLUSTER", "INFO"]);
// parse the response
const clusterInfoMap = new Map<string, string>();

if (clusterInfo) {
const clusterInfoLines = clusterInfo
.toString()
.split("\n")
.filter((line) => line.length > 0);

for (const line of clusterInfoLines) {
const [key, value] = line.split(":");

clusterInfoMap.set(key.trim(), value.trim());
}

if (
clusterInfoMap.get("cluster_state") == "ok" &&
Number(clusterInfoMap.get("cluster_known_nodes")) == count
) {
break;
}
}

await new Promise((resolve) => setTimeout(resolve, 2000));
}

return true;
}

/**
* accept any variable `v` and convert it into String, recursively
*/
Expand Down
8 changes: 4 additions & 4 deletions utils/TestUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ function parseOutput(input: string): {
.split(",")
.map((address) => address.split(":"))
.map((address) => [address[0], Number(address[1])]) as [
string,
number,
][];
string,
number,
][];

if (clusterFolder === undefined || ports === undefined) {
throw new Error(`Insufficient data in input: ${input}`);
Expand Down Expand Up @@ -82,7 +82,7 @@ export class ValkeyCluster {
execFile(
"python3",
[PY_SCRIPT_PATH, ...command.split(" ")],
(error, stdout, stderr) => {
(error, stdout) => {
if (error) {
reject(error);
} else {
Expand Down

0 comments on commit 9c6b040

Please sign in to comment.