Skip to content

Commit

Permalink
test: add tests for GlideClusterClient scan with allowNonCoveredSlots…
Browse files Browse the repository at this point in the history
… option

Signed-off-by: avifenesh <aviarchi1994@gmail.com>
  • Loading branch information
avifenesh committed Dec 18, 2024
1 parent 2a3b1db commit 2dec9d0
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 15 deletions.
8 changes: 8 additions & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import eslint from "@eslint/js";
import prettierConfig from "eslint-config-prettier";
import tseslint from "typescript-eslint";
import jsdoc from "eslint-plugin-jsdoc";

export default tseslint.config(
eslint.configs.recommended,
Expand Down Expand Up @@ -54,6 +55,13 @@ export default tseslint.config(
next: "*",
},
],
"@typescript-eslint/indent": ["error", 4, {
"SwitchCase": 1,
"ObjectExpression": 1,
"FunctionDeclaration": {"parameters": "first"},
"FunctionExpression": {"parameters": "first"},
"ignoredNodes": ["TSTypeParameterInstantiation"]
}],
},
},
prettierConfig,
Expand Down
12 changes: 12 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3855,6 +3855,18 @@ export interface BaseScanOptions {
*/
export interface ScanOptions extends BaseScanOptions {
type?: ObjectType;
}

/**
* Options for the SCAN command.
* `match`: The match filter is applied to the result of the command and will only include keys that match the pattern specified.
* `count`: `COUNT` is a just a hint for the command for how many elements to fetch from the server, the default is 10.
* `type`: The type of the object to scan.
* Types are the data types of Valkey: `string`, `list`, `set`, `zset`, `hash`, `stream`.
* `allowNonCoveredSlots`: If true, the scan will keep scanning even if slots are not covered by the cluster.
* By default, the scan will stop if slots are not covered by the cluster.
*/
export interface ClusterScanOptions extends ScanOptions {
allowNonCoveredSlots?: boolean;
}

Expand Down
12 changes: 6 additions & 6 deletions node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
FunctionStatsSingleResponse,
InfoOptions,
LolwutOptions,
ScanOptions,
ClusterScanOptions,
createClientGetName,
createClientId,
createConfigGet,
Expand Down Expand Up @@ -146,7 +146,7 @@ export namespace GlideClusterClientConfiguration {
/**
* Configuration options for creating a {@link GlideClusterClient | GlideClusterClient}.
*
* Extends `BaseClientConfiguration` with properties specific to `GlideClusterClient`, such as periodic topology checks
* Extends {@link BaseClientConfiguration | BaseClientConfiguration} with properties specific to `GlideClusterClient`, such as periodic topology checks
* and Pub/Sub subscription settings.
*
* @remarks
Expand Down Expand Up @@ -579,7 +579,7 @@ export class GlideClusterClient extends BaseClient {
*/
protected scanOptionsToProto(
cursor: string,
options?: ScanOptions,
options?: ClusterScanOptions,
): command_request.ClusterScan {
const command = command_request.ClusterScan.create();
command.cursor = cursor;
Expand All @@ -605,7 +605,7 @@ export class GlideClusterClient extends BaseClient {
*/
protected createClusterScanPromise(
cursor: ClusterScanCursor,
options?: ScanOptions & DecoderOption,
options?: ClusterScanOptions & DecoderOption,
): Promise<[ClusterScanCursor, GlideString[]]> {
// separate decoder option from scan options
const { decoder, ...scanOptions } = options || {};
Expand Down Expand Up @@ -634,7 +634,7 @@ export class GlideClusterClient extends BaseClient {
*
* @param cursor - The cursor object that wraps the scan state.
* To start a new scan, create a new empty `ClusterScanCursor` using {@link ClusterScanCursor}.
* @param options - (Optional) The scan options, see {@link ScanOptions} and {@link DecoderOption}.
* @param options - (Optional) The scan options, see {@link ClusterScanOptions} and {@link DecoderOption}.
* @returns A Promise resolving to an array containing the next cursor and an array of keys,
* formatted as [`ClusterScanCursor`, `string[]`].
*
Expand Down Expand Up @@ -675,7 +675,7 @@ export class GlideClusterClient extends BaseClient {
*/
public async scan(
cursor: ClusterScanCursor,
options?: ScanOptions & DecoderOption,
options?: ClusterScanOptions & DecoderOption,
): Promise<[ClusterScanCursor, GlideString[]]> {
return this.createClusterScanPromise(cursor, options);
}
Expand Down
162 changes: 162 additions & 0 deletions node/tests/ScanTest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import {
GlideString,
ObjectType,
ProtocolVersion,
GlideClusterClientConfiguration,
} from "..";
import { ValkeyCluster } from "../../utils/TestUtils.js";
import {
flushAndCloseClient,
getClientConfigurationOption,
getServerVersion,
parseEndpoints,
waitForClusterReady as isClusterReadyWithExpectedNodeCount,
} from "./TestUtilities";

const TIMEOUT = 50000;
Expand Down Expand Up @@ -376,6 +378,166 @@ describe("Scan GlideClusterClient", () => {
},
TIMEOUT,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`GlideClusterClient scan with allowNonCoveredSlots %p`,
async (protocol) => {
const testCluster = await ValkeyCluster.createCluster(
true,
3,
0,
getServerVersion,
);
const config: GlideClusterClientConfiguration = {
addresses: testCluster
.getAddresses()
.map(([host, port]) => ({ host, port })),
protocol,
};
const testClient = await GlideClusterClient.createClient(config);

try {
for (let i = 0; i < 10000; i++) {
const result = await testClient.set(`${uuidv4()}`, "value");
expect(result).toBe("OK");
}

// Perform an initial scan to ensure all works as expected
let cursor = new ClusterScanCursor();
let result = await testClient.scan(cursor);
cursor = result[0];
expect(cursor.isFinished()).toBe(false);

// Set 'cluster-require-full-coverage' to 'no' to allow operations with missing slots
await testClient.configSet({
"cluster-require-full-coverage": "no",
});

// Forget one server to simulate a node failure
const addresses = testCluster.getAddresses();
const addressToForget = addresses[0];
const allOtherAddresses = addresses.slice(1);
const idToForget = await testClient.customCommand(
["CLUSTER", "MYID"],
{
route: {
type: "routeByAddress",
host: addressToForget[0],
port: addressToForget[1],
},
},
);

for (const address of allOtherAddresses) {
await testClient.customCommand(
["CLUSTER", "FORGET", idToForget as string],
{
route: {
type: "routeByAddress",
host: address[0],
port: address[1],
},
},
);
}

// Wait for the cluster to stabilize after forgetting a node
const ready = await isClusterReadyWithExpectedNodeCount(
testClient,
allOtherAddresses.length,
);
expect(ready).toBe(true);

// Attempt to scan without 'allowNonCoveredSlots', expecting an error
// Since it might take time for the inner core to forget the missing node,
// we retry the scan until the expected error is thrown.

const maxRetries = 10;
let retries = 0;
let errorReceived = false;

while (retries < maxRetries && !errorReceived) {
retries++;
cursor = new ClusterScanCursor();

try {
while (!cursor.isFinished()) {
result = await testClient.scan(cursor);
cursor = result[0];
}

// If scan completes without error, wait and retry
await new Promise((resolve) =>
setTimeout(resolve, 1000),
);
} catch (error) {
if (
error instanceof Error &&
error.message.includes(
"Could not find an address covering a slot, SCAN operation cannot continue",
)
) {
// Expected error occurred
errorReceived = true;
} else {
// Unexpected error, rethrow
throw error;
}
}
}

expect(errorReceived).toBe(true);

// Perform scan with 'allowNonCoveredSlots: true'
cursor = new ClusterScanCursor();

while (!cursor.isFinished()) {
result = await testClient.scan(cursor, {
allowNonCoveredSlots: true,
});
cursor = result[0];
}

expect(cursor.isFinished()).toBe(true);

// Get keys using 'KEYS *' from the remaining nodes
const keys: GlideString[] = [];

for (const address of allOtherAddresses) {
const result = await testClient.customCommand(
["KEYS", "*"],
{
route: {
type: "routeByAddress",
host: address[0],
port: address[1],
},
},
);
keys.push(...(result as GlideString[]));
}

// Scan again with 'allowNonCoveredSlots: true' and collect results
cursor = new ClusterScanCursor();
const results: GlideString[] = [];

while (!cursor.isFinished()) {
result = await testClient.scan(cursor, {
allowNonCoveredSlots: true,
});
results.push(...result[1]);
cursor = result[0];
}

// Compare the sets of keys obtained from 'KEYS *' and 'SCAN'
expect(new Set(results)).toEqual(new Set(keys));
} finally {
testClient.close();
await testCluster.close();
}
},
TIMEOUT,
);
});

//standalone tests
Expand Down
45 changes: 45 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,51 @@ function intoArrayInternal(obj: any, builder: string[]) {
}
}

// The function is used to check if the cluster is ready with the count nodes known command using the client supplied.
// The way it works is by parsing the response of the CLUSTER INFO command and checking if the cluster_state is ok and the cluster_known_nodes is equal to the count.
// If so, we know the cluster is ready, and it has the amount of nodes we expect.
export async function waitForClusterReady(
client: GlideClusterClient,
count: number,
): Promise<boolean> {
const timeout = 20000; // 20 seconds timeout in milliseconds
const startTime = Date.now();

while (true) {
if (Date.now() - startTime > timeout) {
return false;
}

const clusterInfo = await client.customCommand(["CLUSTER", "INFO"]);
// parse the response
const clusterInfoMap = new Map<string, string>();

if (clusterInfo) {
const clusterInfoLines = clusterInfo
.toString()
.split("\n")
.filter((line) => line.length > 0);

for (const line of clusterInfoLines) {
const [key, value] = line.split(":");

clusterInfoMap.set(key.trim(), value.trim());
}

if (
clusterInfoMap.get("cluster_state") == "ok" &&
Number(clusterInfoMap.get("cluster_known_nodes")) == count
) {
break;
}
}

await new Promise((resolve) => setTimeout(resolve, 2000));
}

return true;
}

/**
* accept any variable `v` and convert it into String, recursively
*/
Expand Down
12 changes: 7 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
{
"devDependencies": {
"@eslint/js": "^9.10.0",
"@eslint/js": "9.17.0",
"@types/eslint__js": "^8.42.3",
"@types/eslint-config-prettier": "^6.11.3",
"eslint": "9.14.0",
"eslint": "9.17.0",
"eslint-config-prettier": "^9.1.0",
"prettier": "^3.3.3",
"typescript": "^5.6.2",
"typescript-eslint": "^8.13"
"eslint-plugin-jsdoc": "^50.6.1",
"prettier": "3.4.2",
"prettier-eslint": "16.3.0",
"typescript": "5.7.2",
"typescript-eslint": "8.18.1"
}
}
8 changes: 4 additions & 4 deletions utils/TestUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ function parseOutput(input: string): {
.split(",")
.map((address) => address.split(":"))
.map((address) => [address[0], Number(address[1])]) as [
string,
number,
][];
string,
number,
][];

if (clusterFolder === undefined || ports === undefined) {
throw new Error(`Insufficient data in input: ${input}`);
Expand Down Expand Up @@ -82,7 +82,7 @@ export class ValkeyCluster {
execFile(
"python3",
[PY_SCRIPT_PATH, ...command.split(" ")],
(error, stdout, stderr) => {
(error, stdout) => {
if (error) {
reject(error);
} else {
Expand Down

0 comments on commit 2dec9d0

Please sign in to comment.