Skip to content

Commit

Permalink
Allow requests/startup to proceed when the Redis client is unavailable
Browse files Browse the repository at this point in the history
  • Loading branch information
PooyaRaki committed Jan 8, 2025
1 parent 79ebc89 commit 5ed8890
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 44 deletions.
1 change: 1 addition & 0 deletions src/config/entities/__tests__/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ export default (): ReturnType<typeof configuration> => ({
pass: process.env.REDIS_PASS,
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || '6379',
timeout: process.env.REDIS_TIMEOUT || 1 * 1_000, // Milliseconds
},
relay: {
baseUri: faker.internet.url({ appendSlash: false }),
Expand Down
1 change: 1 addition & 0 deletions src/config/entities/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ export default () => ({
pass: process.env.REDIS_PASS,
host: process.env.REDIS_HOST || 'localhost',
port: process.env.REDIS_PORT || '6379',
timeout: process.env.REDIS_TIMEOUT || 2 * 1_000, // Milliseconds
},
relay: {
baseUri:
Expand Down
15 changes: 9 additions & 6 deletions src/datasources/accounts/accounts.datasource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,15 @@ export class AccountsDatasource implements IAccountsDatasource, OnModuleInit {
`Invalid client IP while creating account: ${clientIp}`,
);
} else {
const current = await this.cacheService.increment(
CacheRouter.getRateLimitCacheKey(
`${AccountsDatasource.ACCOUNT_CREATION_CACHE_PREFIX}_${clientIp}`,
),
this.accountCreationRateLimitPeriodSeconds,
);
const current =
(await this.cacheService.increment(
CacheRouter.getRateLimitCacheKey(
`${AccountsDatasource.ACCOUNT_CREATION_CACHE_PREFIX}_${clientIp}`,
),
this.accountCreationRateLimitPeriodSeconds,
// If the current value cannot be retrieved from Redis (e.g., due to an error or timeout),
// we allow the user to proceed without blocking their operation.
)) ?? 0;
if (current > this.accountCreationRateLimitCalls) {
this.loggingService.warn(
`Limit of ${this.accountCreationRateLimitCalls} reached for IP ${clientIp}`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,15 @@ export class CounterfactualSafesDatasource
}

private async checkCreationRateLimit(account: Account): Promise<void> {
const current = await this.cacheService.increment(
CacheRouter.getRateLimitCacheKey(
`${CounterfactualSafesDatasource.COUNTERFACTUAL_SAFES_CREATION_CACHE_PREFIX}_${account.address}`,
),
this.counterfactualSafesCreationRateLimitPeriodSeconds,
);
const current =
(await this.cacheService.increment(
CacheRouter.getRateLimitCacheKey(
`${CounterfactualSafesDatasource.COUNTERFACTUAL_SAFES_CREATION_CACHE_PREFIX}_${account.address}`,
),
this.counterfactualSafesCreationRateLimitPeriodSeconds,
// If the current value cannot be retrieved from Redis (e.g., due to an error or timeout),
// we allow the user to proceed without blocking their operation.
)) ?? 0;
if (current > this.counterfactualSafesCreationRateLimitCalls) {
this.loggingService.warn(
`Limit of ${this.counterfactualSafesCreationRateLimitCalls} reached for account ${account.address}`,
Expand Down
15 changes: 9 additions & 6 deletions src/datasources/balances-api/zerion-balances-api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,15 @@ export class ZerionBalancesApi implements IBalancesApi {
}

private async _checkRateLimit(): Promise<void> {
const current = await this.cacheService.increment(
CacheRouter.getRateLimitCacheKey(
ZerionBalancesApi.RATE_LIMIT_CACHE_KEY_PREFIX,
),
this.limitPeriodSeconds,
);
const current =
(await this.cacheService.increment(
CacheRouter.getRateLimitCacheKey(
ZerionBalancesApi.RATE_LIMIT_CACHE_KEY_PREFIX,
),
this.limitPeriodSeconds,
// If the current value cannot be retrieved from Redis (e.g., due to an error or timeout),
// we allow the user to proceed without blocking their operation.
)) ?? 0;
if (current > this.limitCalls) throw new LimitReachedError();
}
}
16 changes: 15 additions & 1 deletion src/datasources/cache/cache.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import { RedisCacheService } from '@/datasources/cache/redis.cache.service';
import { CacheReadiness } from '@/domain/interfaces/cache-readiness.interface';
import { ILoggingService, LoggingService } from '@/logging/logging.interface';
import { CacheKeyPrefix } from '@/datasources/cache/constants';
import {
PromiseTimeoutError,
promiseWithTimeout,
} from '@/domain/common/utils/promise';

export type RedisClientType = ReturnType<typeof createClient>;

Expand All @@ -17,6 +21,7 @@ async function redisClientFactory(
const redisPass = configurationService.get<string>('redis.pass');
const redisHost = configurationService.getOrThrow<string>('redis.host');
const redisPort = configurationService.getOrThrow<string>('redis.port');
const redisTimeout = configurationService.getOrThrow<number>('redis.timeout');
const client: RedisClientType = createClient({
socket: {
host: redisHost,
Expand All @@ -28,7 +33,16 @@ async function redisClientFactory(
client.on('error', (err) =>
loggingService.error(`Redis client error: ${err}`),
);
await client.connect();
client.on('end', () => {
loggingService.error('Redis client terminated!');
});
try {
await promiseWithTimeout(client.connect(), redisTimeout);
} catch (error) {
if (error instanceof PromiseTimeoutError) {
loggingService.error('Redis connect timed out!');
}
}
return client;
}

Expand Down
4 changes: 2 additions & 2 deletions src/datasources/cache/cache.service.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ export interface ICacheService {

hGet(cacheDir: CacheDir): Promise<string | undefined>;

deleteByKey(key: string): Promise<number>;
deleteByKey(key: string): Promise<number | undefined>;

increment(
cacheKey: string,
expireTimeSeconds: number | undefined,
): Promise<number>;
): Promise<number | undefined>;

setCounter(
key: string,
Expand Down
5 changes: 4 additions & 1 deletion src/datasources/cache/redis.cache.service.key-prefix.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { IConfigurationService } from '@/config/configuration.service.inter
import clearAllMocks = jest.clearAllMocks;

const redisClientType = {
isReady: true,
hGet: jest.fn(),
hSet: jest.fn(),
hDel: jest.fn(),
Expand Down Expand Up @@ -37,10 +38,12 @@ describe('RedisCacheService with a Key Prefix', () => {

beforeEach(() => {
clearAllMocks();
defaultExpirationTimeInSeconds = faker.number.int();
defaultExpirationTimeInSeconds = faker.number.int({ min: 1, max: 3600 });
mockConfigurationService.getOrThrow.mockImplementation((key) => {
if (key === 'expirationTimeInSeconds.default') {
return defaultExpirationTimeInSeconds;
} else if (key === 'redis.timeout') {
return defaultExpirationTimeInSeconds * 1_000;
}
throw Error(`Unexpected key: ${key}`);
});
Expand Down
4 changes: 3 additions & 1 deletion src/datasources/cache/redis.cache.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,12 @@ describe('RedisCacheService', () => {
beforeEach(async () => {
clearAllMocks();
await redisClient.flushDb();
defaultExpirationTimeInSeconds = faker.number.int();
defaultExpirationTimeInSeconds = faker.number.int({ min: 1, max: 3600 });
mockConfigurationService.getOrThrow.mockImplementation((key) => {
if (key === 'expirationTimeInSeconds.default') {
return defaultExpirationTimeInSeconds;
} else if (key === 'redis.timeout') {
return defaultExpirationTimeInSeconds * 1_000;
}
throw Error(`Unexpected key: ${key}`);
});
Expand Down
132 changes: 111 additions & 21 deletions src/datasources/cache/redis.cache.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import { ICacheReadiness } from '@/domain/interfaces/cache-readiness.interface';
import { ILoggingService, LoggingService } from '@/logging/logging.interface';
import { IConfigurationService } from '@/config/configuration.service.interface';
import { CacheKeyPrefix } from '@/datasources/cache/constants';
import {
PromiseTimeoutError,
promiseWithTimeout,
} from '@/domain/common/utils/promise';

@Injectable()
export class RedisCacheService
Expand All @@ -31,7 +35,17 @@ export class RedisCacheService
return this.client.ping();
}

ready(): boolean {
return this.client.isReady;
}

async getCounter(key: string): Promise<number | null> {
if (!this.ready()) {
this.logRedisClientUnreadyState('getCounter');

return null;
}

const value = await this.client.get(this._prefixKey(key));
const numericValue = Number(value);
return Number.isInteger(numericValue) ? numericValue : null;
Expand All @@ -46,40 +60,66 @@ export class RedisCacheService
return;
}

if (!this.ready()) {
this.logRedisClientUnreadyState('hSet');

return undefined;
}

const key = this._prefixKey(cacheDir.key);

try {
await this.client.hSet(key, cacheDir.field, value);
await this.timeout(this.client.hSet(key, cacheDir.field, value));
// NX - Set expiry only when the key has no expiry
// See https://redis.io/commands/expire/
await this.client.expire(key, expireTimeSeconds, 'NX');
await this.timeout(this.client.expire(key, expireTimeSeconds, 'NX'));
} catch (error) {
await this.client.hDel(key, cacheDir.field);
await this.timeout(this.client.hDel(key, cacheDir.field));
throw error;
}
}

async hGet(cacheDir: CacheDir): Promise<string | undefined> {
if (!this.ready()) {
this.logRedisClientUnreadyState('hGet');

return undefined;
}

const key = this._prefixKey(cacheDir.key);
return await this.client.hGet(key, cacheDir.field);
return await this.timeout(this.client.hGet(key, cacheDir.field));
}

async deleteByKey(key: string): Promise<number> {
async deleteByKey(key: string): Promise<number | undefined> {
if (!this.ready()) {
this.logRedisClientUnreadyState('deleteByKey');

return undefined;
}

const keyWithPrefix = this._prefixKey(key);
// see https://redis.io/commands/unlink/
const result = await this.client.unlink(keyWithPrefix);
await this.hSet(
new CacheDir(`invalidationTimeMs:${key}`, ''),
Date.now().toString(),
this.defaultExpirationTimeInSeconds,
const result = await this.timeout(this.client.unlink(keyWithPrefix));
await this.timeout(
this.hSet(
new CacheDir(`invalidationTimeMs:${key}`, ''),
Date.now().toString(),
this.defaultExpirationTimeInSeconds,
),
);
return result;
}

async increment(
cacheKey: string,
expireTimeSeconds: number | undefined,
): Promise<number> {
): Promise<number | undefined> {
if (!this.ready()) {
this.logRedisClientUnreadyState('increment');

return undefined;
}

const transaction = this.client.multi().incr(cacheKey);
if (expireTimeSeconds !== undefined && expireTimeSeconds > 0) {
transaction.expire(cacheKey, expireTimeSeconds, 'NX');
Expand All @@ -93,6 +133,12 @@ export class RedisCacheService
value: number,
expireTimeSeconds: number,
): Promise<void> {
if (!this.ready()) {
this.logRedisClientUnreadyState('setCounter');

return undefined;
}

await this.client.set(key, value, { EX: expireTimeSeconds, NX: true });
}

Expand Down Expand Up @@ -121,21 +167,65 @@ export class RedisCacheService
* instance is not responding it invokes {@link forceQuit}.
*/
async onModuleDestroy(): Promise<void> {
this.loggingService.info('Closing Redis connection');
const forceQuitTimeout = setTimeout(() => {
this.forceQuit.bind(this);
}, this.quitTimeoutInSeconds * 1000);
await this.client.quit();
clearTimeout(forceQuitTimeout);
this.loggingService.info('Redis connection closed');
if (!this.ready()) {
this.logRedisClientUnreadyState('onModuleDestroy');

return undefined;
}
this.loggingService.info('Closing Redis connection...');
try {
await promiseWithTimeout(
this.client.quit(),
this.quitTimeoutInSeconds * 1000,
);
this.loggingService.info('Redis connection closed');
} catch (error) {
if (error instanceof PromiseTimeoutError) {
await this.forceQuit();
}
}
}

/**
* Forces the closing of the Redis connection associated with this service.
*/
private async forceQuit(): Promise<void> {
this.loggingService.warn('Forcing Redis connection close');
await this.client.disconnect();
this.loggingService.warn('Redis connection closed');
if (!this.ready()) {
this.logRedisClientUnreadyState('forceQuit');

return undefined;
}
this.loggingService.warn('Forcing Redis connection to close...');
try {
await this.client.disconnect();
this.loggingService.warn('Redis connection forcefully closed!');
} catch (error) {
this.loggingService.error(`Cannot close Redis connection: ${error}`);
}
}

private async timeout<T>(
queryObject: Promise<T>,
timeout?: number,
): Promise<T | undefined> {
timeout =
timeout ?? this.configurationService.getOrThrow<number>('redis.timeout');
try {
return await promiseWithTimeout(queryObject, timeout);
} catch (error) {
if (error instanceof PromiseTimeoutError) {
this.loggingService.error('Redis Query Timed out!');

return undefined;
}

throw error;
}
}

private logRedisClientUnreadyState(operation: string): void {
this.loggingService.error(
`Redis client is not ready. Redis ${operation} failed!`,
);
}
}
30 changes: 30 additions & 0 deletions src/domain/common/utils/promise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { HttpException, HttpStatus } from '@nestjs/common';

export const promiseWithTimeout = <T>(
promise: Promise<T>,
timeout: number,
): Promise<T | undefined> => {
let timeoutId: NodeJS.Timeout;

return Promise.race<T | undefined>([
promise,
new Promise((_, reject) => {
timeoutId = setTimeout(
() => reject(new PromiseTimeoutError('Promise timed out!', 500)),
timeout,
);
}),
]).finally(() => {
clearTimeout(timeoutId);
});
};

export class PromiseTimeoutError extends HttpException {
//
public constructor(
message: string,
statusCode: number = HttpStatus.SERVICE_UNAVAILABLE,
) {
super(message, statusCode);
}
}

0 comments on commit 5ed8890

Please sign in to comment.