From 5ed8890acf2b884f4b0c23e7b2c2429917458a31 Mon Sep 17 00:00:00 2001 From: Pooya Raki Date: Thu, 2 Jan 2025 18:21:48 +0100 Subject: [PATCH] Allow requests/startup to proceed when the Redis client is unavailable --- .../entities/__tests__/configuration.ts | 1 + src/config/entities/configuration.ts | 1 + .../accounts/accounts.datasource.ts | 15 +- .../counterfactual-safes.datasource.ts | 15 +- .../zerion-balances-api.service.ts | 15 +- src/datasources/cache/cache.module.ts | 16 ++- .../cache/cache.service.interface.ts | 4 +- .../redis.cache.service.key-prefix.spec.ts | 5 +- .../cache/redis.cache.service.spec.ts | 4 +- src/datasources/cache/redis.cache.service.ts | 132 +++++++++++++++--- src/domain/common/utils/promise.ts | 30 ++++ 11 files changed, 194 insertions(+), 44 deletions(-) create mode 100644 src/domain/common/utils/promise.ts diff --git a/src/config/entities/__tests__/configuration.ts b/src/config/entities/__tests__/configuration.ts index 7ccd74a340..6ddd4b1481 100644 --- a/src/config/entities/__tests__/configuration.ts +++ b/src/config/entities/__tests__/configuration.ts @@ -209,6 +209,7 @@ export default (): ReturnType => ({ pass: process.env.REDIS_PASS, host: process.env.REDIS_HOST || 'localhost', port: process.env.REDIS_PORT || '6379', + timeout: process.env.REDIS_TIMEOUT || 1 * 1_000, // Milliseconds }, relay: { baseUri: faker.internet.url({ appendSlash: false }), diff --git a/src/config/entities/configuration.ts b/src/config/entities/configuration.ts index f3f77c15d5..bb829208c5 100644 --- a/src/config/entities/configuration.ts +++ b/src/config/entities/configuration.ts @@ -313,6 +313,7 @@ export default () => ({ pass: process.env.REDIS_PASS, host: process.env.REDIS_HOST || 'localhost', port: process.env.REDIS_PORT || '6379', + timeout: process.env.REDIS_TIMEOUT || 2 * 1_000, // Milliseconds }, relay: { baseUri: diff --git a/src/datasources/accounts/accounts.datasource.ts b/src/datasources/accounts/accounts.datasource.ts index 600b530703..937b4e2ae4 100644 --- a/src/datasources/accounts/accounts.datasource.ts +++ b/src/datasources/accounts/accounts.datasource.ts @@ -253,12 +253,15 @@ export class AccountsDatasource implements IAccountsDatasource, OnModuleInit { `Invalid client IP while creating account: ${clientIp}`, ); } else { - const current = await this.cacheService.increment( - CacheRouter.getRateLimitCacheKey( - `${AccountsDatasource.ACCOUNT_CREATION_CACHE_PREFIX}_${clientIp}`, - ), - this.accountCreationRateLimitPeriodSeconds, - ); + const current = + (await this.cacheService.increment( + CacheRouter.getRateLimitCacheKey( + `${AccountsDatasource.ACCOUNT_CREATION_CACHE_PREFIX}_${clientIp}`, + ), + this.accountCreationRateLimitPeriodSeconds, + // If the current value cannot be retrieved from Redis (e.g., due to an error or timeout), + // we allow the user to proceed without blocking their operation. + )) ?? 0; if (current > this.accountCreationRateLimitCalls) { this.loggingService.warn( `Limit of ${this.accountCreationRateLimitCalls} reached for IP ${clientIp}`, diff --git a/src/datasources/accounts/counterfactual-safes/counterfactual-safes.datasource.ts b/src/datasources/accounts/counterfactual-safes/counterfactual-safes.datasource.ts index 94c0882b4a..5ec78c62fb 100644 --- a/src/datasources/accounts/counterfactual-safes/counterfactual-safes.datasource.ts +++ b/src/datasources/accounts/counterfactual-safes/counterfactual-safes.datasource.ts @@ -160,12 +160,15 @@ export class CounterfactualSafesDatasource } private async checkCreationRateLimit(account: Account): Promise { - const current = await this.cacheService.increment( - CacheRouter.getRateLimitCacheKey( - `${CounterfactualSafesDatasource.COUNTERFACTUAL_SAFES_CREATION_CACHE_PREFIX}_${account.address}`, - ), - this.counterfactualSafesCreationRateLimitPeriodSeconds, - ); + const current = + (await this.cacheService.increment( + CacheRouter.getRateLimitCacheKey( + `${CounterfactualSafesDatasource.COUNTERFACTUAL_SAFES_CREATION_CACHE_PREFIX}_${account.address}`, + ), + this.counterfactualSafesCreationRateLimitPeriodSeconds, + // If the current value cannot be retrieved from Redis (e.g., due to an error or timeout), + // we allow the user to proceed without blocking their operation. + )) ?? 0; if (current > this.counterfactualSafesCreationRateLimitCalls) { this.loggingService.warn( `Limit of ${this.counterfactualSafesCreationRateLimitCalls} reached for account ${account.address}`, diff --git a/src/datasources/balances-api/zerion-balances-api.service.ts b/src/datasources/balances-api/zerion-balances-api.service.ts index 60cb14a6f4..c754428ab2 100644 --- a/src/datasources/balances-api/zerion-balances-api.service.ts +++ b/src/datasources/balances-api/zerion-balances-api.service.ts @@ -380,12 +380,15 @@ export class ZerionBalancesApi implements IBalancesApi { } private async _checkRateLimit(): Promise { - const current = await this.cacheService.increment( - CacheRouter.getRateLimitCacheKey( - ZerionBalancesApi.RATE_LIMIT_CACHE_KEY_PREFIX, - ), - this.limitPeriodSeconds, - ); + const current = + (await this.cacheService.increment( + CacheRouter.getRateLimitCacheKey( + ZerionBalancesApi.RATE_LIMIT_CACHE_KEY_PREFIX, + ), + this.limitPeriodSeconds, + // If the current value cannot be retrieved from Redis (e.g., due to an error or timeout), + // we allow the user to proceed without blocking their operation. + )) ?? 0; if (current > this.limitCalls) throw new LimitReachedError(); } } diff --git a/src/datasources/cache/cache.module.ts b/src/datasources/cache/cache.module.ts index 4361e33e5e..8060fc6aec 100644 --- a/src/datasources/cache/cache.module.ts +++ b/src/datasources/cache/cache.module.ts @@ -6,6 +6,10 @@ import { RedisCacheService } from '@/datasources/cache/redis.cache.service'; import { CacheReadiness } from '@/domain/interfaces/cache-readiness.interface'; import { ILoggingService, LoggingService } from '@/logging/logging.interface'; import { CacheKeyPrefix } from '@/datasources/cache/constants'; +import { + PromiseTimeoutError, + promiseWithTimeout, +} from '@/domain/common/utils/promise'; export type RedisClientType = ReturnType; @@ -17,6 +21,7 @@ async function redisClientFactory( const redisPass = configurationService.get('redis.pass'); const redisHost = configurationService.getOrThrow('redis.host'); const redisPort = configurationService.getOrThrow('redis.port'); + const redisTimeout = configurationService.getOrThrow('redis.timeout'); const client: RedisClientType = createClient({ socket: { host: redisHost, @@ -28,7 +33,16 @@ async function redisClientFactory( client.on('error', (err) => loggingService.error(`Redis client error: ${err}`), ); - await client.connect(); + client.on('end', () => { + loggingService.error('Redis client terminated!'); + }); + try { + await promiseWithTimeout(client.connect(), redisTimeout); + } catch (error) { + if (error instanceof PromiseTimeoutError) { + loggingService.error('Redis connect timed out!'); + } + } return client; } diff --git a/src/datasources/cache/cache.service.interface.ts b/src/datasources/cache/cache.service.interface.ts index 2206a4bbdb..e931e07c7a 100644 --- a/src/datasources/cache/cache.service.interface.ts +++ b/src/datasources/cache/cache.service.interface.ts @@ -13,12 +13,12 @@ export interface ICacheService { hGet(cacheDir: CacheDir): Promise; - deleteByKey(key: string): Promise; + deleteByKey(key: string): Promise; increment( cacheKey: string, expireTimeSeconds: number | undefined, - ): Promise; + ): Promise; setCounter( key: string, diff --git a/src/datasources/cache/redis.cache.service.key-prefix.spec.ts b/src/datasources/cache/redis.cache.service.key-prefix.spec.ts index 67b4fdfd74..2de0ea643c 100644 --- a/src/datasources/cache/redis.cache.service.key-prefix.spec.ts +++ b/src/datasources/cache/redis.cache.service.key-prefix.spec.ts @@ -8,6 +8,7 @@ import type { IConfigurationService } from '@/config/configuration.service.inter import clearAllMocks = jest.clearAllMocks; const redisClientType = { + isReady: true, hGet: jest.fn(), hSet: jest.fn(), hDel: jest.fn(), @@ -37,10 +38,12 @@ describe('RedisCacheService with a Key Prefix', () => { beforeEach(() => { clearAllMocks(); - defaultExpirationTimeInSeconds = faker.number.int(); + defaultExpirationTimeInSeconds = faker.number.int({ min: 1, max: 3600 }); mockConfigurationService.getOrThrow.mockImplementation((key) => { if (key === 'expirationTimeInSeconds.default') { return defaultExpirationTimeInSeconds; + } else if (key === 'redis.timeout') { + return defaultExpirationTimeInSeconds * 1_000; } throw Error(`Unexpected key: ${key}`); }); diff --git a/src/datasources/cache/redis.cache.service.spec.ts b/src/datasources/cache/redis.cache.service.spec.ts index 4c946ae770..444c0a967b 100644 --- a/src/datasources/cache/redis.cache.service.spec.ts +++ b/src/datasources/cache/redis.cache.service.spec.ts @@ -38,10 +38,12 @@ describe('RedisCacheService', () => { beforeEach(async () => { clearAllMocks(); await redisClient.flushDb(); - defaultExpirationTimeInSeconds = faker.number.int(); + defaultExpirationTimeInSeconds = faker.number.int({ min: 1, max: 3600 }); mockConfigurationService.getOrThrow.mockImplementation((key) => { if (key === 'expirationTimeInSeconds.default') { return defaultExpirationTimeInSeconds; + } else if (key === 'redis.timeout') { + return defaultExpirationTimeInSeconds * 1_000; } throw Error(`Unexpected key: ${key}`); }); diff --git a/src/datasources/cache/redis.cache.service.ts b/src/datasources/cache/redis.cache.service.ts index ca80d23db4..dd520db835 100644 --- a/src/datasources/cache/redis.cache.service.ts +++ b/src/datasources/cache/redis.cache.service.ts @@ -6,6 +6,10 @@ import { ICacheReadiness } from '@/domain/interfaces/cache-readiness.interface'; import { ILoggingService, LoggingService } from '@/logging/logging.interface'; import { IConfigurationService } from '@/config/configuration.service.interface'; import { CacheKeyPrefix } from '@/datasources/cache/constants'; +import { + PromiseTimeoutError, + promiseWithTimeout, +} from '@/domain/common/utils/promise'; @Injectable() export class RedisCacheService @@ -31,7 +35,17 @@ export class RedisCacheService return this.client.ping(); } + ready(): boolean { + return this.client.isReady; + } + async getCounter(key: string): Promise { + if (!this.ready()) { + this.logRedisClientUnreadyState('getCounter'); + + return null; + } + const value = await this.client.get(this._prefixKey(key)); const numericValue = Number(value); return Number.isInteger(numericValue) ? numericValue : null; @@ -46,32 +60,52 @@ export class RedisCacheService return; } + if (!this.ready()) { + this.logRedisClientUnreadyState('hSet'); + + return undefined; + } + const key = this._prefixKey(cacheDir.key); try { - await this.client.hSet(key, cacheDir.field, value); + await this.timeout(this.client.hSet(key, cacheDir.field, value)); // NX - Set expiry only when the key has no expiry // See https://redis.io/commands/expire/ - await this.client.expire(key, expireTimeSeconds, 'NX'); + await this.timeout(this.client.expire(key, expireTimeSeconds, 'NX')); } catch (error) { - await this.client.hDel(key, cacheDir.field); + await this.timeout(this.client.hDel(key, cacheDir.field)); throw error; } } async hGet(cacheDir: CacheDir): Promise { + if (!this.ready()) { + this.logRedisClientUnreadyState('hGet'); + + return undefined; + } + const key = this._prefixKey(cacheDir.key); - return await this.client.hGet(key, cacheDir.field); + return await this.timeout(this.client.hGet(key, cacheDir.field)); } - async deleteByKey(key: string): Promise { + async deleteByKey(key: string): Promise { + if (!this.ready()) { + this.logRedisClientUnreadyState('deleteByKey'); + + return undefined; + } + const keyWithPrefix = this._prefixKey(key); // see https://redis.io/commands/unlink/ - const result = await this.client.unlink(keyWithPrefix); - await this.hSet( - new CacheDir(`invalidationTimeMs:${key}`, ''), - Date.now().toString(), - this.defaultExpirationTimeInSeconds, + const result = await this.timeout(this.client.unlink(keyWithPrefix)); + await this.timeout( + this.hSet( + new CacheDir(`invalidationTimeMs:${key}`, ''), + Date.now().toString(), + this.defaultExpirationTimeInSeconds, + ), ); return result; } @@ -79,7 +113,13 @@ export class RedisCacheService async increment( cacheKey: string, expireTimeSeconds: number | undefined, - ): Promise { + ): Promise { + if (!this.ready()) { + this.logRedisClientUnreadyState('increment'); + + return undefined; + } + const transaction = this.client.multi().incr(cacheKey); if (expireTimeSeconds !== undefined && expireTimeSeconds > 0) { transaction.expire(cacheKey, expireTimeSeconds, 'NX'); @@ -93,6 +133,12 @@ export class RedisCacheService value: number, expireTimeSeconds: number, ): Promise { + if (!this.ready()) { + this.logRedisClientUnreadyState('setCounter'); + + return undefined; + } + await this.client.set(key, value, { EX: expireTimeSeconds, NX: true }); } @@ -121,21 +167,65 @@ export class RedisCacheService * instance is not responding it invokes {@link forceQuit}. */ async onModuleDestroy(): Promise { - this.loggingService.info('Closing Redis connection'); - const forceQuitTimeout = setTimeout(() => { - this.forceQuit.bind(this); - }, this.quitTimeoutInSeconds * 1000); - await this.client.quit(); - clearTimeout(forceQuitTimeout); - this.loggingService.info('Redis connection closed'); + if (!this.ready()) { + this.logRedisClientUnreadyState('onModuleDestroy'); + + return undefined; + } + this.loggingService.info('Closing Redis connection...'); + try { + await promiseWithTimeout( + this.client.quit(), + this.quitTimeoutInSeconds * 1000, + ); + this.loggingService.info('Redis connection closed'); + } catch (error) { + if (error instanceof PromiseTimeoutError) { + await this.forceQuit(); + } + } } /** * Forces the closing of the Redis connection associated with this service. */ private async forceQuit(): Promise { - this.loggingService.warn('Forcing Redis connection close'); - await this.client.disconnect(); - this.loggingService.warn('Redis connection closed'); + if (!this.ready()) { + this.logRedisClientUnreadyState('forceQuit'); + + return undefined; + } + this.loggingService.warn('Forcing Redis connection to close...'); + try { + await this.client.disconnect(); + this.loggingService.warn('Redis connection forcefully closed!'); + } catch (error) { + this.loggingService.error(`Cannot close Redis connection: ${error}`); + } + } + + private async timeout( + queryObject: Promise, + timeout?: number, + ): Promise { + timeout = + timeout ?? this.configurationService.getOrThrow('redis.timeout'); + try { + return await promiseWithTimeout(queryObject, timeout); + } catch (error) { + if (error instanceof PromiseTimeoutError) { + this.loggingService.error('Redis Query Timed out!'); + + return undefined; + } + + throw error; + } + } + + private logRedisClientUnreadyState(operation: string): void { + this.loggingService.error( + `Redis client is not ready. Redis ${operation} failed!`, + ); } } diff --git a/src/domain/common/utils/promise.ts b/src/domain/common/utils/promise.ts new file mode 100644 index 0000000000..b6bf6aa438 --- /dev/null +++ b/src/domain/common/utils/promise.ts @@ -0,0 +1,30 @@ +import { HttpException, HttpStatus } from '@nestjs/common'; + +export const promiseWithTimeout = ( + promise: Promise, + timeout: number, +): Promise => { + let timeoutId: NodeJS.Timeout; + + return Promise.race([ + promise, + new Promise((_, reject) => { + timeoutId = setTimeout( + () => reject(new PromiseTimeoutError('Promise timed out!', 500)), + timeout, + ); + }), + ]).finally(() => { + clearTimeout(timeoutId); + }); +}; + +export class PromiseTimeoutError extends HttpException { + // + public constructor( + message: string, + statusCode: number = HttpStatus.SERVICE_UNAVAILABLE, + ) { + super(message, statusCode); + } +}