Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve caching - adds TTL, cacheTransport and cacheGetKey #739

Merged
merged 4 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions lib/cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Simple in-memory cache implementation
* @class MemoryCache
* @property {Map} cache Cache map
*/
class MemoryCache {
constructor () {
this.cache = new Map();
}

/**
* Get cache value by key
* @param {string} key Cache key
* @return {any} Response from cache
*/
get (key) {
const cached = this.cache.get(key);
if (cached) {
if (cached.expiresAt > Date.now() || cached.expiresAt === 0) {
return cached.value;
}
this.cache.delete(key);
}
return undefined;
}

/**
* Set cache key with value and ttl
* @param {string} key Cache key
* @param {any} value Value to cache
* @param {number} ttl Time to live in milliseconds
* @return {void}
*/
set (key, value, ttl) {
this.cache.set(key, {
expiresAt: ttl,
value
});
}

/**
* Clear cache
* @returns {void}
*/
flush () {
this.cache.clear();
}
}

module.exports = exports = MemoryCache;
72 changes: 57 additions & 15 deletions lib/circuit.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const EventEmitter = require('events');
const Status = require('./status');
const Semaphore = require('./semaphore');
const MemoryCache = require('./cache');

const STATE = Symbol('state');
const OPEN = Symbol('open');
Expand All @@ -14,7 +15,6 @@ const FALLBACK_FUNCTION = Symbol('fallback');
const STATUS = Symbol('status');
const NAME = Symbol('name');
const GROUP = Symbol('group');
const CACHE = new WeakMap();
const ENABLED = Symbol('Enabled');
const WARMING_UP = Symbol('warming-up');
const VOLUME_THRESHOLD = Symbol('volume-threshold');
Expand Down Expand Up @@ -86,6 +86,15 @@ Please use options.errorThresholdPercentage`;
* has been cached that value will be returned for every subsequent execution:
* the cache can be cleared using `clearCache`. (The metrics `cacheHit` and
* `cacheMiss` reflect cache activity.) Default: false
* @param {Number} options.cacheTTL the time to live for the cache
* in milliseconds. Set 0 for infinity cache. Default: 0 (no TTL)
* @param {Function} options.cacheGetKey function that returns the key to use
* when caching the result of the circuit's fire.
* Better to use custom one, because `JSON.stringify` is not good
* from performance perspective.
* Default: `(...args) => JSON.stringify(args)`
* @param {CacheTransport} options.cacheTransport custom cache transport
* should implement `get`, `set` and `flush` methods.
* @param {AbortController} options.abortController this allows Opossum to
* signal upon timeout and properly abort your on going requests instead of
* leaving it in the background
Expand Down Expand Up @@ -146,6 +155,24 @@ class CircuitBreaker extends EventEmitter {
? options.capacity
: Number.MAX_SAFE_INTEGER;
this.options.errorFilter = options.errorFilter || (_ => false);
this.options.cacheTTL = options.cacheTTL ?? 0;
this.options.cacheGetKey = options.cacheGetKey ??
((...args) => JSON.stringify(args));

// Set default cache transport if not provided
if (this.options.cache) {
if (this.options.cacheTransport === undefined) {
this.options.cacheTransport = new MemoryCache();
} else if (typeof this.options.cacheTransport !== 'object' ||
!this.options.cacheTransport.get ||
!this.options.cacheTransport.set ||
!this.options.cacheTransport.flush
) {
throw new TypeError(
'options.cacheTransport should be an object with `get`, `set` and `flush` methods'
);
}
}

this.semaphore = new Semaphore(this.options.capacity);

Expand Down Expand Up @@ -275,9 +302,6 @@ class CircuitBreaker extends EventEmitter {
this.close();
}
});
if (this.options.cache) {
CACHE.set(this, undefined);
}

// Prepopulate the State of the Breaker
if (this[SHUTDOWN]) {
Expand Down Expand Up @@ -365,6 +389,9 @@ class CircuitBreaker extends EventEmitter {
}
this.status.shutdown();
this[STATE] = SHUTDOWN;

// clear cache on shutdown
this.clearCache();
}

/**
Expand Down Expand Up @@ -557,22 +584,29 @@ class CircuitBreaker extends EventEmitter {
}
const args = Array.prototype.slice.call(rest);

// Need to create variable here to prevent extra calls if cache is disabled
let cacheKey = '';

/**
* Emitted when the circuit breaker action is executed
* @event CircuitBreaker#fire
* @type {any} the arguments passed to the fired function
*/
this.emit('fire', args);

if (CACHE.get(this) !== undefined) {
/**
* Emitted when the circuit breaker is using the cache
* and finds a value.
* @event CircuitBreaker#cacheHit
*/
this.emit('cacheHit');
return CACHE.get(this);
} else if (this.options.cache) {
// If cache is enabled, check if we have a cached value
if (this.options.cache) {
cacheKey = this.options.cacheGetKey.apply(this, rest);
const cached = this.options.cacheTransport.get(cacheKey);
if (cached) {
/**
* Emitted when the circuit breaker is using the cache
* and finds a value.
* @event CircuitBreaker#cacheHit
*/
this.emit('cacheHit');
return cached;
}
/**
* Emitted when the circuit breaker does not find a value in
* the cache, but the cache option is enabled.
Expand Down Expand Up @@ -649,7 +683,13 @@ class CircuitBreaker extends EventEmitter {
this.semaphore.release();
resolve(result);
if (this.options.cache) {
CACHE.set(this, promise);
this.options.cacheTransport.set(
cacheKey,
promise,
this.options.cacheTTL > 0
? Date.now() + this.options.cacheTTL
: 0
);
}
}
})
Expand Down Expand Up @@ -686,7 +726,9 @@ class CircuitBreaker extends EventEmitter {
* @returns {void}
*/
clearCache () {
CACHE.set(this, undefined);
if (this.options.cache) {
this.options.cacheTransport.flush();
}
}

/**
Expand Down
170 changes: 170 additions & 0 deletions test/cache.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
'use strict';

const test = require('tape');
const CircuitBreaker = require('../');
const common = require('./common');

const passFail = common.passFail;

test('Using cache', t => {
t.plan(9);
const expected = 34;
const options = {
cache: true
};
const breaker = new CircuitBreaker(passFail, options);

breaker.fire(expected)
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 0, 'does not hit the cache');
t.equals(stats.cacheMisses, 1, 'emits a cacheMiss');
t.equals(stats.fires, 1, 'fired once');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 1, 'did not emit miss');
t.equals(stats.fires, 2, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
breaker.clearCache();
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.shutdown())
.then(t.end)
.catch(t.fail);
});

test('Using cache with TTL', t => {
t.plan(12);
const expected = 34;
const options = {
cache: true,
cacheTTL: 100
};
const breaker = new CircuitBreaker(passFail, options);

return breaker.fire(expected)
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 0, 'does not hit the cache');
t.equals(stats.cacheMisses, 1, 'emits a cacheMiss');
t.equals(stats.fires, 1, 'fired once');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 1, 'did not emit miss');
t.equals(stats.fires, 2, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
// wait 100ms for the cache to expire
.then(() => new Promise(resolve => setTimeout(resolve, 100)))
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 2, 'did not emit miss');
t.equals(stats.fires, 3, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(t.end)
.catch(t.fail);
});

test('Using cache with custom get cache key', t => {
t.plan(9);
const expected = 34;
const options = {
cache: true,
cacheGetKey: x => `key-${x}`
};
const breaker = new CircuitBreaker(passFail, options);

breaker.fire(expected)
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 0, 'does not hit the cache');
t.equals(stats.cacheMisses, 1, 'emits a cacheMiss');
t.equals(stats.fires, 1, 'fired once');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 1, 'did not emit miss');
t.equals(stats.fires, 2, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
breaker.clearCache();
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.shutdown())
.then(t.end)
.catch(t.fail);
});

test('Using cache with custom transport', t => {
t.plan(9);
const expected = 34;
const cache = new Map();
const options = {
cache: true,
cacheTransport: {
get: key => cache.get(key),
set: (key, value) => cache.set(key, value),
flush: () => cache.clear()
}
};
const breaker = new CircuitBreaker(passFail, options);

breaker.fire(expected)
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 0, 'does not hit the cache');
t.equals(stats.cacheMisses, 1, 'emits a cacheMiss');
t.equals(stats.fires, 1, 'fired once');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(stats.cacheHits, 1, 'hit the cache');
t.equals(stats.cacheMisses, 1, 'did not emit miss');
t.equals(stats.fires, 2, 'fired twice');
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
breaker.clearCache();
})
.then(() => breaker.fire(expected))
.then(arg => {
const stats = breaker.status.stats;
t.equals(arg, expected,
`cache hits:misses ${stats.cacheHits}:${stats.cacheMisses}`);
})
.then(() => breaker.shutdown())
.then(t.end)
.catch(t.fail);
});
Loading