diff --git a/eslint.config.mjs b/eslint.config.mjs index 21995480f4..a96d4fdecd 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -2,6 +2,7 @@ import eslint from "@eslint/js"; import prettierConfig from "eslint-config-prettier"; import tseslint from "typescript-eslint"; +import jsdoc from "eslint-plugin-jsdoc"; export default tseslint.config( eslint.configs.recommended, @@ -54,6 +55,13 @@ export default tseslint.config( next: "*", }, ], + "@typescript-eslint/indent": ["error", 4, { + "SwitchCase": 1, + "ObjectExpression": 1, + "FunctionDeclaration": {"parameters": "first"}, + "FunctionExpression": {"parameters": "first"}, + "ignoredNodes": ["TSTypeParameterInstantiation"] + }], }, }, prettierConfig, diff --git a/node/src/Commands.ts b/node/src/Commands.ts index 96a9e65764..cd6f10e584 100644 --- a/node/src/Commands.ts +++ b/node/src/Commands.ts @@ -3855,6 +3855,18 @@ export interface BaseScanOptions { */ export interface ScanOptions extends BaseScanOptions { type?: ObjectType; +} + +/** + * Options for the SCAN command. + * `match`: The match filter is applied to the result of the command and will only include keys that match the pattern specified. + * `count`: `COUNT` is a just a hint for the command for how many elements to fetch from the server, the default is 10. + * `type`: The type of the object to scan. + * Types are the data types of Valkey: `string`, `list`, `set`, `zset`, `hash`, `stream`. + * `allowNonCoveredSlots`: If true, the scan will keep scanning even if slots are not covered by the cluster. + * By default, the scan will stop if slots are not covered by the cluster. + */ +export interface ClusterScanOptions extends ScanOptions { allowNonCoveredSlots?: boolean; } diff --git a/node/src/GlideClusterClient.ts b/node/src/GlideClusterClient.ts index 326ab7a949..4e9aee579d 100644 --- a/node/src/GlideClusterClient.ts +++ b/node/src/GlideClusterClient.ts @@ -23,7 +23,7 @@ import { FunctionStatsSingleResponse, InfoOptions, LolwutOptions, - ScanOptions, + ClusterScanOptions, createClientGetName, createClientId, createConfigGet, @@ -146,7 +146,7 @@ export namespace GlideClusterClientConfiguration { /** * Configuration options for creating a {@link GlideClusterClient | GlideClusterClient}. * - * Extends `BaseClientConfiguration` with properties specific to `GlideClusterClient`, such as periodic topology checks + * Extends {@link BaseClientConfiguration | BaseClientConfiguration} with properties specific to `GlideClusterClient`, such as periodic topology checks * and Pub/Sub subscription settings. * * @remarks @@ -579,7 +579,7 @@ export class GlideClusterClient extends BaseClient { */ protected scanOptionsToProto( cursor: string, - options?: ScanOptions, + options?: ClusterScanOptions, ): command_request.ClusterScan { const command = command_request.ClusterScan.create(); command.cursor = cursor; @@ -605,7 +605,7 @@ export class GlideClusterClient extends BaseClient { */ protected createClusterScanPromise( cursor: ClusterScanCursor, - options?: ScanOptions & DecoderOption, + options?: ClusterScanOptions & DecoderOption, ): Promise<[ClusterScanCursor, GlideString[]]> { // separate decoder option from scan options const { decoder, ...scanOptions } = options || {}; @@ -634,7 +634,7 @@ export class GlideClusterClient extends BaseClient { * * @param cursor - The cursor object that wraps the scan state. * To start a new scan, create a new empty `ClusterScanCursor` using {@link ClusterScanCursor}. - * @param options - (Optional) The scan options, see {@link ScanOptions} and {@link DecoderOption}. + * @param options - (Optional) The scan options, see {@link ClusterScanOptions} and {@link DecoderOption}. * @returns A Promise resolving to an array containing the next cursor and an array of keys, * formatted as [`ClusterScanCursor`, `string[]`]. * @@ -675,7 +675,7 @@ export class GlideClusterClient extends BaseClient { */ public async scan( cursor: ClusterScanCursor, - options?: ScanOptions & DecoderOption, + options?: ClusterScanOptions & DecoderOption, ): Promise<[ClusterScanCursor, GlideString[]]> { return this.createClusterScanPromise(cursor, options); } diff --git a/node/tests/ScanTest.test.ts b/node/tests/ScanTest.test.ts index bff90bab36..bb370a81db 100644 --- a/node/tests/ScanTest.test.ts +++ b/node/tests/ScanTest.test.ts @@ -12,6 +12,7 @@ import { GlideString, ObjectType, ProtocolVersion, + GlideClusterClientConfiguration, } from ".."; import { ValkeyCluster } from "../../utils/TestUtils.js"; import { @@ -19,6 +20,7 @@ import { getClientConfigurationOption, getServerVersion, parseEndpoints, + waitForClusterReady as isClusterReadyWithExpectedNodeCount, } from "./TestUtilities"; const TIMEOUT = 50000; @@ -376,6 +378,166 @@ describe("Scan GlideClusterClient", () => { }, TIMEOUT, ); + + it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])( + `GlideClusterClient scan with allowNonCoveredSlots %p`, + async (protocol) => { + const testCluster = await ValkeyCluster.createCluster( + true, + 3, + 0, + getServerVersion, + ); + const config: GlideClusterClientConfiguration = { + addresses: testCluster + .getAddresses() + .map(([host, port]) => ({ host, port })), + protocol, + }; + const testClient = await GlideClusterClient.createClient(config); + + try { + for (let i = 0; i < 10000; i++) { + const result = await testClient.set(`${uuidv4()}`, "value"); + expect(result).toBe("OK"); + } + + // Perform an initial scan to ensure all works as expected + let cursor = new ClusterScanCursor(); + let result = await testClient.scan(cursor); + cursor = result[0]; + expect(cursor.isFinished()).toBe(false); + + // Set 'cluster-require-full-coverage' to 'no' to allow operations with missing slots + await testClient.configSet({ + "cluster-require-full-coverage": "no", + }); + + // Forget one server to simulate a node failure + const addresses = testCluster.getAddresses(); + const addressToForget = addresses[0]; + const allOtherAddresses = addresses.slice(1); + const idToForget = await testClient.customCommand( + ["CLUSTER", "MYID"], + { + route: { + type: "routeByAddress", + host: addressToForget[0], + port: addressToForget[1], + }, + }, + ); + + for (const address of allOtherAddresses) { + await testClient.customCommand( + ["CLUSTER", "FORGET", idToForget as string], + { + route: { + type: "routeByAddress", + host: address[0], + port: address[1], + }, + }, + ); + } + + // Wait for the cluster to stabilize after forgetting a node + const ready = await isClusterReadyWithExpectedNodeCount( + testClient, + allOtherAddresses.length, + ); + expect(ready).toBe(true); + + // Attempt to scan without 'allowNonCoveredSlots', expecting an error + // Since it might take time for the inner core to forget the missing node, + // we retry the scan until the expected error is thrown. + + const maxRetries = 10; + let retries = 0; + let errorReceived = false; + + while (retries < maxRetries && !errorReceived) { + retries++; + cursor = new ClusterScanCursor(); + + try { + while (!cursor.isFinished()) { + result = await testClient.scan(cursor); + cursor = result[0]; + } + + // If scan completes without error, wait and retry + await new Promise((resolve) => + setTimeout(resolve, 1000), + ); + } catch (error) { + if ( + error instanceof Error && + error.message.includes( + "Could not find an address covering a slot, SCAN operation cannot continue", + ) + ) { + // Expected error occurred + errorReceived = true; + } else { + // Unexpected error, rethrow + throw error; + } + } + } + + expect(errorReceived).toBe(true); + + // Perform scan with 'allowNonCoveredSlots: true' + cursor = new ClusterScanCursor(); + + while (!cursor.isFinished()) { + result = await testClient.scan(cursor, { + allowNonCoveredSlots: true, + }); + cursor = result[0]; + } + + expect(cursor.isFinished()).toBe(true); + + // Get keys using 'KEYS *' from the remaining nodes + const keys: GlideString[] = []; + + for (const address of allOtherAddresses) { + const result = await testClient.customCommand( + ["KEYS", "*"], + { + route: { + type: "routeByAddress", + host: address[0], + port: address[1], + }, + }, + ); + keys.push(...(result as GlideString[])); + } + + // Scan again with 'allowNonCoveredSlots: true' and collect results + cursor = new ClusterScanCursor(); + const results: GlideString[] = []; + + while (!cursor.isFinished()) { + result = await testClient.scan(cursor, { + allowNonCoveredSlots: true, + }); + results.push(...result[1]); + cursor = result[0]; + } + + // Compare the sets of keys obtained from 'KEYS *' and 'SCAN' + expect(new Set(results)).toEqual(new Set(keys)); + } finally { + testClient.close(); + await testCluster.close(); + } + }, + TIMEOUT, + ); }); //standalone tests diff --git a/node/tests/TestUtilities.ts b/node/tests/TestUtilities.ts index c7564f1648..a58abacb6c 100644 --- a/node/tests/TestUtilities.ts +++ b/node/tests/TestUtilities.ts @@ -84,6 +84,51 @@ function intoArrayInternal(obj: any, builder: string[]) { } } +// The function is used to check if the cluster is ready with the count nodes known command using the client supplied. +// The way it works is by parsing the response of the CLUSTER INFO command and checking if the cluster_state is ok and the cluster_known_nodes is equal to the count. +// If so, we know the cluster is ready, and it has the amount of nodes we expect. +export async function waitForClusterReady( + client: GlideClusterClient, + count: number, +): Promise { + const timeout = 20000; // 20 seconds timeout in milliseconds + const startTime = Date.now(); + + while (true) { + if (Date.now() - startTime > timeout) { + return false; + } + + const clusterInfo = await client.customCommand(["CLUSTER", "INFO"]); + // parse the response + const clusterInfoMap = new Map(); + + if (clusterInfo) { + const clusterInfoLines = clusterInfo + .toString() + .split("\n") + .filter((line) => line.length > 0); + + for (const line of clusterInfoLines) { + const [key, value] = line.split(":"); + + clusterInfoMap.set(key.trim(), value.trim()); + } + + if ( + clusterInfoMap.get("cluster_state") == "ok" && + Number(clusterInfoMap.get("cluster_known_nodes")) == count + ) { + break; + } + } + + await new Promise((resolve) => setTimeout(resolve, 2000)); + } + + return true; +} + /** * accept any variable `v` and convert it into String, recursively */ diff --git a/package.json b/package.json index 3f61298feb..c6676131a2 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,14 @@ { "devDependencies": { - "@eslint/js": "^9.10.0", + "@eslint/js": "9.17.0", "@types/eslint__js": "^8.42.3", "@types/eslint-config-prettier": "^6.11.3", - "eslint": "9.14.0", + "eslint": "9.17.0", "eslint-config-prettier": "^9.1.0", - "prettier": "^3.3.3", - "typescript": "^5.6.2", - "typescript-eslint": "^8.13" + "eslint-plugin-jsdoc": "^50.6.1", + "prettier": "3.4.2", + "prettier-eslint": "16.3.0", + "typescript": "5.7.2", + "typescript-eslint": "8.18.1" } } diff --git a/utils/TestUtils.ts b/utils/TestUtils.ts index 423bf8e9cb..9c89788528 100644 --- a/utils/TestUtils.ts +++ b/utils/TestUtils.ts @@ -21,9 +21,9 @@ function parseOutput(input: string): { .split(",") .map((address) => address.split(":")) .map((address) => [address[0], Number(address[1])]) as [ - string, - number, - ][]; + string, + number, + ][]; if (clusterFolder === undefined || ports === undefined) { throw new Error(`Insufficient data in input: ${input}`); @@ -82,7 +82,7 @@ export class ValkeyCluster { execFile( "python3", [PY_SCRIPT_PATH, ...command.split(" ")], - (error, stdout, stderr) => { + (error, stdout) => { if (error) { reject(error); } else {