diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 9e20ee6..7549a5a 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -1,19 +1,21 @@ import assert from 'node:assert' -// import { LRUCache } from 'lru-cache' import { DecoratorHandler, parseHeaders, parseCacheControl } from '../utils.js' +import { DatabaseSync } from 'node:sqlite' // --experimental-sqlite class CacheHandler extends DecoratorHandler { #handler #store #key + #opts #value = null - constructor({ key, handler, store }) { + constructor({ key, handler, store, opts = [] }) { super(handler) this.#key = key this.#handler = handler this.#store = store + this.#opts = opts } onConnect(abort) { @@ -64,11 +66,10 @@ class CacheHandler extends DecoratorHandler { (rawHeaders?.reduce((xs, x) => xs + x.length, 0) ?? 0) + (statusMessage?.length ?? 0) + 64, - ttl: ttl * 1e3, + ttl, // in ms! } } } - return this.#handler.onHeaders(statusCode, rawHeaders, resume, statusMessage, headers) } @@ -88,20 +89,16 @@ class CacheHandler extends DecoratorHandler { onComplete(rawTrailers) { if (this.#value) { - // get - const entries = this.#handler.entries const resHeaders = parseHeaders(this.#value.data.rawHeaders) - const reqHeaders = this.#handler.opts + const reqHeaders = this.#opts - // set this.#value.data.rawTrailers = rawTrailers this.#value.size = this.#value.size ? this.#value.size + rawTrailers?.reduce((xs, x) => xs + x.length, 0) : 0 this.#value.vary = formatVaryData(resHeaders, reqHeaders) - entries.push(this.#value) - sortEntriesByVary(entries) - this.#store.set(this.#key, entries) + + this.#store.set(this.#key, this.#value) } return this.#handler.onComplete(rawTrailers) } @@ -115,20 +112,72 @@ function formatVaryData(resHeaders, reqHeaders) { .filter(([_key, val]) => val) } -// TODO (fix): Async filesystem cache. +// Can we move this class somewhere else, to the util.js file or its own module? class CacheStore { - constructor({ maxSize = 1024 * 1024, maxEntrySize = 128 * 1024 }) { - this.maxSize = maxSize - this.maxEntrySize = maxEntrySize - this.cache = new Map() + constructor() { + this.database = null + this.init() } - set(key, value, opts) { - this.cache.set(key, value) + init() { + this.database = new DatabaseSync('file:memdb1?mode=memory&cache=shared') + + this.database.exec(` + CREATE TABLE IF NOT EXISTS cacheInterceptor( + key TEXT, + data TEXT, + vary TEXT, + size INTEGER, + ttl INTEGER, + insertTime INTEGER + ) STRICT + `) + } + + set(key, entry, opts) { + if (!this.database) { + throw new Error('Database not initialized') + } + + entry.data = JSON.stringify(entry.data) + entry.vary = JSON.stringify(entry.vary) + + const insert = this.database.prepare( + 'INSERT INTO cacheInterceptor (key, data, vary, size, ttl, insertTime) VALUES (?, ?, ?, ?, ?, ?)', + ) + + insert.run(key, entry.data, entry.vary, entry.size, entry.ttl, Date.now()) + + this.purge() } get(key) { - return this.cache.get(key) + if (!this.database) { + throw new Error('Database not initialized') + } + this.purge() + const query = this.database.prepare('SELECT * FROM cacheInterceptor WHERE key = ?') + const rows = query.all(key) + rows.map((i) => { + i.data = JSON.parse(i.data) + i.vary = JSON.parse(i.vary) + return i + }) + + // Just in case purge hasn't finished + const nonExpiredRows = rows.filter((i) => i.insertTime + i.ttl > Date.now()) + + return nonExpiredRows + } + + purge() { + const query = this.database.prepare('DELETE FROM cacheInterceptor WHERE insertTime + ttl < ?') + query.run(Date.now()) + } + + deleteAll() { + const query = this.database.prepare('DELETE FROM cacheInterceptor') + query.run() } } @@ -151,12 +200,12 @@ function findEntryByHeaders(entries, reqHeaders) { return entries?.find( (entry) => entry.vary?.every(([key, val]) => { - return reqHeaders[key] === val + return reqHeaders?.headers[key] === val }) ?? true, ) } -const DEFAULT_CACHE_STORE = new CacheStore({ maxSize: 128 * 1024, maxEntrySize: 1024 }) +const DEFAULT_CACHE_STORE = new CacheStore() export default (opts) => (dispatch) => (opts, handler) => { if (!opts.cache || opts.upgrade) { @@ -189,6 +238,11 @@ export default (opts) => (dispatch) => (opts, handler) => { opts.host = opts.host ?? new URL(opts.origin).host + if (!opts.headers) { + opts.headers = {} + } + + // idea: use DEFAULT_CACHE_STORE by default if 'cache' not specified, since the cache interceptor was already specified to be used. const store = opts.cache === true ? DEFAULT_CACHE_STORE : opts.cache if (!store) { @@ -196,7 +250,7 @@ export default (opts) => (dispatch) => (opts, handler) => { } let key = `${opts.method}:${opts.path}` - console.log('getting key: ' + key) + let entries = store.get(key) if (Array.isArray(entries) && entries.length === 0 && opts.method === 'HEAD') { @@ -219,11 +273,14 @@ export default (opts) => (dispatch) => (opts, handler) => { try { handler.onConnect(abort) signal.throwIfAborted() + handler.onHeaders(statusCode, rawHeaders, resume, statusMessage) signal.throwIfAborted() + if (opts.method !== 'HEAD') { for (const chunk of body) { const ret = handler.onData(chunk) + signal.throwIfAborted() if (ret === false) { // TODO (fix): back pressure... @@ -239,8 +296,6 @@ export default (opts) => (dispatch) => (opts, handler) => { return true } else { - handler.opts = opts - handler.entries = entries - return dispatch(opts, new CacheHandler({ handler, store, key })) + return dispatch(opts, new CacheHandler({ handler, store, key, opts })) } } diff --git a/package.json b/package.json index c283924..b46a4ab 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,6 @@ "cache-control-parser": "^2.0.6", "cacheable-lookup": "^7.0.0", "http-errors": "^2.0.0", - "lru-cache": "^11.0.0", "undici": "^6.19.5" }, "devDependencies": { diff --git a/test/cache.js b/test/cache.js index a04d691..0c731d7 100644 --- a/test/cache.js +++ b/test/cache.js @@ -2,26 +2,82 @@ import { test } from 'tap' import { createServer } from 'node:http' import undici from 'undici' import { interceptors } from '../lib/index.js' +import { DatabaseSync } from 'node:sqlite' -// Placeholder until we implement a better LRU Cache class CacheStore { constructor() { - this.cache = new Map() + this.database = null + this.init() } - set(key, value) { - this.cache.set(key, value) + init() { + this.database = new DatabaseSync('file:memdb1?mode=memory&cache=shared') + + this.database.exec(` + CREATE TABLE IF NOT EXISTS cacheInterceptor( + key TEXT, + data TEXT, + vary TEXT, + size INTEGER, + ttl INTEGER, + insertTime INTEGER + ) STRICT + `) + } + + set(key, entry) { + if (!this.database) { + throw new Error('Database not initialized') + } + + // Format the entry object + entry.data = JSON.stringify(entry.data) + entry.vary = JSON.stringify(entry.vary) + + const insert = this.database.prepare( + 'INSERT INTO cacheInterceptor (key, data, vary, size, ttl, insertTime) VALUES (?, ?, ?, ?, ?, ?)', + ) + + insert.run(key, entry.data, entry.vary, entry.size, entry.ttl, Date.now()) + + this.purge() } get(key) { - return this.cache.get(key) + if (!this.database) { + throw new Error('Database not initialized') + } + this.purge() + const query = this.database.prepare('SELECT * FROM cacheInterceptor WHERE key = ?') + const rows = query.all(key) + rows.map((i) => { + i.data = JSON.parse(i.data) + i.vary = JSON.parse(i.vary) + return i + }) + + // Just in case purge hasn't finished + const nonExpiredRows = rows.filter((i) => i.insertTime + i.ttl > Date.now()) + + return nonExpiredRows } -} -async function exampleCache() { - const cache = new CacheStore() + purge() { + if (!this.database) { + throw new Error('Database not initialized') + } + const query = this.database.prepare('DELETE FROM cacheInterceptor WHERE insertTime + ttl < ?') + query.run(Date.now()) + } + + deleteAll() { + const query = this.database.prepare('DELETE FROM cacheInterceptor') + query.run() + } +} - const rawHeaders = [ +function exampleEntries() { + const rawHeaders1 = [ Buffer.from('Content-Type'), Buffer.from('application/json'), Buffer.from('Content-Length'), @@ -29,14 +85,26 @@ async function exampleCache() { Buffer.from('Cache-Control'), Buffer.from('public'), ] + const rawHeaders2 = [ + Buffer.from('Accept'), + Buffer.from('application/txt'), + Buffer.from('Content-Length'), + Buffer.from('4'), + Buffer.from('origin2'), + Buffer.from('www.google.com/images'), + Buffer.from('User-Agent'), + Buffer.from('Chrome'), + Buffer.from('Cache-Control'), + Buffer.from('public'), + ] const entries = [ { data: { statusCode: 200, statusMessage: '', - rawHeaders, - rawTrailers: ['Hello'], + rawHeaders: rawHeaders1, + rawTrailers: ['Hello', 'world'], body: ['asd1'], }, vary: [ @@ -50,8 +118,8 @@ async function exampleCache() { data: { statusCode: 200, statusMessage: '', - rawHeaders, - rawTrailers: ['Hello'], + rawHeaders: rawHeaders2, + rawTrailers: ['Hello', 'world'], body: ['asd2'], }, vary: [ @@ -69,7 +137,7 @@ async function exampleCache() { data: { statusCode: 200, statusMessage: 'first', - rawHeaders, + rawHeaders1, rawTrailers: ['Hello'], body: ['asd4'], }, @@ -82,11 +150,28 @@ async function exampleCache() { size: 100, ttl: 31556952000, }, + { + data: { + statusCode: 200, + statusMessage: 'to be purged', + rawHeaders1, + rawTrailers: ['Hello'], + body: ['asd4'], + }, + vary: [ + ['Accept', 'application/json'], + ['User-Agent', 'Mozilla/5.0'], + ['host2', 'www.google.com'], + ['origin2', 'www.google.com/images'], + ], + size: 100, + ttl: 1, + }, ] - cache.set('GET:/', entries) - return cache + return entries } +// This test will not always pass because of different execution times of operations in the in-memory database each time. test('cache request, no matching entry found. Store response in cache', async (t) => { t.plan(4) const server = createServer((req, res) => { @@ -103,42 +188,41 @@ test('cache request, no matching entry found. Store response in cache', async (t t.teardown(server.close.bind(server)) - const cache = await exampleCache() + const cache = new CacheStore() - console.log('Cache before first request:') - console.log({ cache: cache.cache }) + // populate cache + cache.deleteAll() + exampleEntries().forEach((i) => cache.set('GET:/', i)) const cacheLength1 = cache.get('GET:/').length - console.log({ cacheLength1 }) - server.listen(0, async () => { const serverPort = server.address().port // response not found in cache, response should be added to cache. const response = await undici.request(`http://0.0.0.0:${serverPort}`, { dispatcher: new undici.Agent().compose(interceptors.cache()), - cache, + cache: true, }) let str = '' for await (const chunk of response.body) { str += chunk } const cacheLength2 = cache.get('GET:/').length - console.log({ cacheLength2 }) - console.log({ str }) + + // should return the default server response t.equal(str, 'foob') - t.equal(cacheLength2, cacheLength1 + 1) - console.log('Cache before second request:') - console.log({ cache: cache.cache }) + t.equal(cacheLength2, cacheLength1 + 1) // response found in cache, return cached response. const response2 = await undici.request(`http://0.0.0.0:${serverPort}`, { dispatcher: new undici.Agent().compose(interceptors.cache()), - cache, - Accept: 'application/txt', - 'User-Agent': 'Chrome', - origin2: 'www.google.com/images', + headers: { + Accept: 'application/txt', + 'User-Agent': 'Chrome', + origin2: 'www.google.com/images', + }, + cache: true, }) let str2 = '' for await (const chunk of response2.body) { @@ -146,9 +230,14 @@ test('cache request, no matching entry found. Store response in cache', async (t } const cacheLength3 = cache.get('GET:/').length - console.log({ cacheLength3 }) + // should return the body from the cached entry t.equal(str2, 'asd2') + + // cache should still have the same number of entries before + // and after a cached entry was used as a response t.equal(cacheLength3, cacheLength2) + + cache.database.close() }) })