Skip to content

Commit

Permalink
Node: add AZ Affinity ReadFrom strategy Support (valkey-io#2686)
Browse files Browse the repository at this point in the history
* Added AZAffinity strategy to Node.js

---------

Signed-off-by: Muhammad Awawdi <mawawdi@amazon.com>
  • Loading branch information
Muhammad-awawdi-amazon authored Nov 20, 2024
1 parent ea5667b commit 3c25ab9
Show file tree
Hide file tree
Showing 3 changed files with 482 additions and 16 deletions.
14 changes: 14 additions & 0 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,18 @@ export interface BaseClientConfiguration {
* used.
*/
inflightRequestsLimit?: number;
/**
* Availability Zone of the client.
* If ReadFrom strategy is AZAffinity, this setting ensures that readonly commands are directed to replicas within the specified AZ if exits.
*
* @example
* ```typescript
* // Example configuration for setting client availability zone and read strategy
* configuration.clientAz = 'us-east-1a'; // Sets the client's availability zone
* configuration.readFrom = 'AZAffinity'; // Directs read operations to nodes within the same AZ
* ```
*/
clientAz?: string;
}

/**
Expand Down Expand Up @@ -719,6 +731,7 @@ export class BaseClient {
private readonly pubsubFutures: [PromiseFunction, ErrorFunction][] = [];
private pendingPushNotification: response.Response[] = [];
private readonly inflightRequestsLimit: number;
private readonly clientAz: string | undefined;
private config: BaseClientConfiguration | undefined;

protected configurePubsub(
Expand Down Expand Up @@ -7578,6 +7591,7 @@ export class BaseClient {
readFrom,
authenticationInfo,
inflightRequestsLimit: options.inflightRequestsLimit,
clientAz: options.clientAz ?? null,
};
}

Expand Down
24 changes: 22 additions & 2 deletions node/tests/GlideClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ const TIMEOUT = 50000;
describe("GlideClient", () => {
let testsFailed = 0;
let cluster: ValkeyCluster;
let azCluster: ValkeyCluster;
let client: GlideClient;
let azClient: GlideClient;
beforeAll(async () => {
const standaloneAddresses = global.STAND_ALONE_ENDPOINT;
cluster = standaloneAddresses
Expand All @@ -61,17 +63,28 @@ describe("GlideClient", () => {
getServerVersion,
)
: await ValkeyCluster.createCluster(false, 1, 1, getServerVersion);

azCluster = standaloneAddresses
? await ValkeyCluster.initFromExistingCluster(
false,
parseEndpoints(standaloneAddresses),
getServerVersion,
)
: await ValkeyCluster.createCluster(false, 1, 1, getServerVersion);
}, 20000);

afterEach(async () => {
await flushAndCloseClient(false, cluster.getAddresses(), client);
await flushAndCloseClient(false, azCluster.getAddresses(), azClient);
});

afterAll(async () => {
if (testsFailed === 0) {
await cluster.close();
await azCluster.close();
} else {
await cluster.close(true);
await azCluster.close();
}
}, TIMEOUT);

Expand Down Expand Up @@ -1500,18 +1513,25 @@ describe("GlideClient", () => {
}
},
);

runBaseTests({
init: async (protocol, configOverrides) => {
const config = getClientConfigurationOption(
cluster.getAddresses(),
protocol,
configOverrides,
);
client = await GlideClient.createClient(config);

const configNew = getClientConfigurationOption(
azCluster.getAddresses(),
protocol,
configOverrides,
);

testsFailed += 1;
azClient = await GlideClient.createClient(configNew);
client = await GlideClient.createClient(config);
return { client, cluster };
return { client, cluster, azClient, azCluster };
},
close: (testSucceeded: boolean) => {
if (testSucceeded) {
Expand Down
Loading

0 comments on commit 3c25ab9

Please sign in to comment.