diff --git a/.gitignore b/.gitignore index 5ed33aa..095e85c 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ db .nyc* yarn.lock .tap +file:* diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index f5397f6..11af2ba 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -1,19 +1,23 @@ import assert from 'node:assert' -import { LRUCache } from 'lru-cache' import { DecoratorHandler, parseHeaders, parseCacheControl } from '../utils.js' +import Database from 'better-sqlite3' + +import * as BJSON from 'buffer-json' class CacheHandler extends DecoratorHandler { #handler #store #key + #opts #value = null - constructor({ key, handler, store }) { + constructor({ key, handler, store, opts = [] }) { super(handler) this.#key = key this.#handler = handler this.#store = store + this.#opts = opts } onConnect(abort) { @@ -23,7 +27,7 @@ class CacheHandler extends DecoratorHandler { } onHeaders(statusCode, rawHeaders, resume, statusMessage, headers = parseHeaders(rawHeaders)) { - if (statusCode !== 307) { + if (statusCode !== 307 && statusCode !== 200) { return this.#handler.onHeaders(statusCode, rawHeaders, resume, statusMessage, headers) } @@ -64,18 +68,16 @@ class CacheHandler extends DecoratorHandler { (rawHeaders?.reduce((xs, x) => xs + x.length, 0) ?? 0) + (statusMessage?.length ?? 0) + 64, - ttl: ttl * 1e3, + expires: Date.now() + ttl, // in ms! } } } - return this.#handler.onHeaders(statusCode, rawHeaders, resume, statusMessage, headers) } onData(chunk) { if (this.#value) { this.#value.size += chunk.bodyLength - const maxEntrySize = this.#store.maxEntrySize ?? Infinity if (this.#value.size > maxEntrySize) { this.#value = null @@ -88,37 +90,147 @@ class CacheHandler extends DecoratorHandler { onComplete(rawTrailers) { if (this.#value) { + const resHeaders = parseHeaders(this.#value.data.rawHeaders) + + // Early return if Vary = *, uncacheable. + if (resHeaders.vary === '*') { + return this.#handler.onComplete(rawTrailers) + } + + const reqHeaders = this.#opts + this.#value.data.rawTrailers = rawTrailers - this.#value.size += rawTrailers?.reduce((xs, x) => xs + x.length, 0) ?? 0 - this.#store.set(this.#key, this.#value.data, { ttl: this.#value.ttl, size: this.#value.size }) + this.#value.size = this.#value.size + ? this.#value.size + rawTrailers?.reduce((xs, x) => xs + x.length, 0) + : 0 + + this.#value.vary = formatVaryData(resHeaders, reqHeaders) + + this.#store.set(this.#key, this.#value) } return this.#handler.onComplete(rawTrailers) } } -// TODO (fix): Async filesystem cache. -class CacheStore { - constructor({ maxSize = 1024 * 1024, maxEntrySize = 128 * 1024 }) { - this.maxSize = maxSize - this.maxEntrySize = maxEntrySize - this.cache = new LRUCache({ maxSize }) +function formatVaryData(resHeaders, reqHeaders) { + return resHeaders.vary + ?.split(',') + .map((key) => key.trim().toLowerCase()) + .map((key) => [key, reqHeaders[key] ?? reqHeaders.headers[key]]) + .filter(([_key, val]) => val) +} + +export class CacheStore { + connected = false + #insertquery + #getQuery + #purgeExpiredQuery + #purgeOldestQuery + #database + #sizeQuery + #size = 0 + #maxSize = 128e9 + #maxTTL = Infinity + + constructor(location = ':memory:', opts = {}) { + this.#maxSize = opts.maxSize ?? this.#maxSize + this.#maxTTL = opts.maxTTL ?? this.#maxTTL + + this.#database = new Database(location) + + this.#database.exec(` + CREATE TABLE IF NOT EXISTS cacheInterceptor( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key TEXT, + data TEXT, + vary TEXT, + size INTEGER, + expires INTEGER + ) STRICT + `) + + this.#insertquery = this.#database.prepare( + 'INSERT INTO cacheInterceptor (key, data, vary, size, expires) VALUES (?, ?, ?, ?, ?)', + ) + this.#getQuery = this.#database.prepare( + 'SELECT * FROM cacheInterceptor WHERE key = ? AND expires > ?', + ) + this.#purgeExpiredQuery = this.#database.prepare( + 'DELETE FROM cacheInterceptor WHERE expires < ?', + ) + this.#sizeQuery = this.#database.prepare('SELECT SUM(size) FROM cacheInterceptor') + this.#purgeOldestQuery = this.#database.prepare(` + DELETE FROM cacheInterceptor + WHERE id IN ( + SELECT id + FROM cacheInterceptor + ORDER BY expires ASC + LIMIT (SELECT CEILING(COUNT(*) * 0.10) FROM cacheInterceptor) + ); + `) + + this.connected = true } - set(key, value, opts) { - this.cache.set(key, value, opts) + set(key, { data, vary, size, expires }) { + if (expires < Date.now()) { + return + } + expires = Math.min(expires, Date.now() + this.#maxTTL) + + this.#insertquery.run(key, BJSON.stringify(data), BJSON.stringify(vary), size, expires) + + this.#size += size + this.#maybePurge() } get(key) { - return this.cache.get(key) + const rows = this.#getQuery.all(key, Date.now()) + + return rows.map((row) => { + const { data, vary, size, expires } = row + return { + data: BJSON.parse(data), + vary: JSON.parse(vary), + size: parseInt(size), + expires: parseInt(expires), + } + }) + } + + close() { + this.#database.close() + this.connected = false + } + + #maybePurge() { + if (this.#size == null || this.#size > this.#maxSize) { + this.#purgeExpiredQuery.run(Date.now()) + this.#size = this.#sizeQuery.get()['SUM(size)'] + + // In the case where the cache is full but has no expired entries yet, delete 10% of the cache, ordered by + // the oldest entries according to the 'expires' column. + if (this.#size > this.#maxSize) { + this.#purgeOldestQuery.run() + this.#size = this.#sizeQuery.get()['SUM(size)'] + } + } } } -function makeKey(opts) { - // NOTE: Ignores headers... - return `${opts.origin}:${opts.method}:${opts.path}` +function findEntryByHeaders(entries, request) { + const foundEntry = entries?.find( + (entry) => + entry.vary?.every(([key, val]) => { + return request?.headers[key] === val || request[key] === val + }) ?? true, + ) + + // if no exact match was found, take the latest added entry + return foundEntry ?? entries[0] } -const DEFAULT_CACHE_STORE = new CacheStore({ maxSize: 128 * 1024, maxEntrySize: 1024 }) +let cacheInstance = null export default (opts) => (dispatch) => (opts, handler) => { if (!opts.cache || opts.upgrade) { @@ -148,23 +260,38 @@ export default (opts) => (dispatch) => (opts, handler) => { // Dump body... opts.body?.on('error', () => {}).resume() + opts.host = opts.host ?? new URL(opts.origin).host - const store = opts.cache === true ? DEFAULT_CACHE_STORE : opts.cache + if (!opts.headers) { + opts.headers = {} + } - if (!store) { - throw new Error(`Cache store not provided.`) + // Supported opts.cache values: [true, false, 'clear', custom cache] + // create new cache instance if none exists + if (opts.cache === 'clear' || (!cacheInstance?.connected && opts.cache === true)) { + cacheInstance = new CacheStore() } - let key = makeKey(opts) - let value = store.get(key) + // or use provided cache instead + if (typeof opts.cache === 'object') { + cacheInstance = opts.cache + } - if (value == null && opts.method === 'HEAD') { - key = makeKey({ ...opts, method: 'GET' }) - value = store.get(key) + if (!cacheInstance) { + throw new Error(`Cache store not provided.`) } - if (value) { - const { statusCode, statusMessage, rawHeaders, rawTrailers, body } = value + const key = `${opts.method}:${opts.path}` + + const entries = + (cacheInstance.get(key) ?? opts.method === 'HEAD') + ? cacheInstance.get(`GET:${opts.path}`) + : null + + const entry = findEntryByHeaders(entries, opts) + + if (entry) { + const { statusCode, statusMessage, rawHeaders, rawTrailers, body } = entry.data const ac = new AbortController() const signal = ac.signal @@ -176,11 +303,14 @@ export default (opts) => (dispatch) => (opts, handler) => { try { handler.onConnect(abort) signal.throwIfAborted() + handler.onHeaders(statusCode, rawHeaders, resume, statusMessage) signal.throwIfAborted() + if (opts.method !== 'HEAD') { for (const chunk of body) { const ret = handler.onData(chunk) + signal.throwIfAborted() if (ret === false) { // TODO (fix): back pressure... @@ -196,6 +326,6 @@ export default (opts) => (dispatch) => (opts, handler) => { return true } else { - return dispatch(opts, new CacheHandler({ handler, store, key: makeKey(opts) })) + return dispatch(opts, new CacheHandler({ handler, store: cacheInstance, key, opts })) } } diff --git a/lib/interceptor/proxy.js b/lib/interceptor/proxy.js index 1dd95bb..93a7fc3 100644 --- a/lib/interceptor/proxy.js +++ b/lib/interceptor/proxy.js @@ -14,6 +14,7 @@ class Handler extends DecoratorHandler { } onUpgrade(statusCode, rawHeaders, socket) { + console.log('Proxy onUpgrade') return this.#handler.onUpgrade( statusCode, reduceHeaders( @@ -34,6 +35,7 @@ class Handler extends DecoratorHandler { } onHeaders(statusCode, rawHeaders, resume, statusMessage) { + console.log('Proxy onHeaders') return this.#handler.onHeaders( statusCode, reduceHeaders( @@ -164,6 +166,7 @@ function printIp(address, port) { } export default (opts) => (dispatch) => (opts, handler) => { + console.log('Proxy default dispatch') if (!opts.proxy) { return dispatch(opts, handler) } diff --git a/lib/interceptor/redirect.js b/lib/interceptor/redirect.js index 40a5282..6ccf298 100644 --- a/lib/interceptor/redirect.js +++ b/lib/interceptor/redirect.js @@ -36,6 +36,7 @@ class Handler extends DecoratorHandler { } onConnect(abort) { + console.log('Redirect onConnect') if (this.#aborted) { abort(this.#reason) } else { @@ -48,6 +49,7 @@ class Handler extends DecoratorHandler { } onHeaders(statusCode, rawHeaders, resume, statusText, headers = parseHeaders(rawHeaders)) { + console.log('Redirect onHeaders') if (redirectableStatusCodes.indexOf(statusCode) === -1) { assert(!this.#headersSent) this.#headersSent = true @@ -109,6 +111,7 @@ class Handler extends DecoratorHandler { } onData(chunk) { + console.log('Redirect onData') if (this.#location) { /* https://tools.ietf.org/html/rfc7231#section-6.4 diff --git a/package.json b/package.json index 26ac3e0..2f60564 100644 --- a/package.json +++ b/package.json @@ -9,10 +9,11 @@ "lib/*" ], "dependencies": { + "better-sqlite3": "^11.2.1", + "buffer-json": "^2.0.0", "cache-control-parser": "^2.0.6", "cacheable-lookup": "^7.0.0", "http-errors": "^2.0.0", - "lru-cache": "^11.0.0", "undici": "^6.19.5" }, "devDependencies": { diff --git a/test/cache.js b/test/cache.js index 4a8fdc4..5723e3c 100644 --- a/test/cache.js +++ b/test/cache.js @@ -2,23 +2,359 @@ import { test } from 'tap' import { createServer } from 'node:http' import undici from 'undici' import { interceptors } from '../lib/index.js' +import { CacheStore } from '../lib/interceptor/cache.js' -test('cache request', (t) => { - t.plan(1) +function exampleEntries() { + const rawHeaders1 = [ + Buffer.from('Content-Type'), + Buffer.from('application/json'), + Buffer.from('Content-Length'), + Buffer.from('10'), + Buffer.from('Cache-Control'), + Buffer.from('public'), + ] + const rawHeaders2 = [ + Buffer.from('Accept'), + Buffer.from('application/txt'), + Buffer.from('Content-Length'), + Buffer.from('4'), + Buffer.from('origin2'), + Buffer.from('www.google.com/images'), + Buffer.from('User-Agent'), + Buffer.from('Chrome'), + Buffer.from('Cache-Control'), + Buffer.from('public'), + ] + + const entries = [ + { + data: { + statusCode: 200, + statusMessage: '', + rawHeaders: rawHeaders1, + rawTrailers: ['Hello', 'world'], + body: ['asd1'], + }, + vary: [ + ['Accept', 'application/xml'], + ['User-Agent', 'Mozilla/5.0'], + ], + size: 100, + expires: Date.now() * 2 + Math.floor(Math.random() * 100), + }, + { + data: { + statusCode: 200, + statusMessage: '', + rawHeaders: rawHeaders2, + rawTrailers: ['Hello', 'world'], + body: ['asd2'], + }, + vary: [ + ['Accept', 'application/txt'], + ['User-Agent', 'Chrome'], + ['origin2', 'www.google.com/images'], + ], + size: 100, + expires: Date.now() * 2 + Math.floor(Math.random() * 100), + }, + { + data: { + statusCode: 200, + statusMessage: 'first', + rawHeaders: rawHeaders1, + rawTrailers: ['Hello'], + body: ['asd4'], + }, + vary: [ + ['Accept', 'application/json'], + ['User-Agent', 'Mozilla/5.0'], + ['host2', 'www.google.com'], + ['origin2', 'www.google.com/images'], + ], + size: 100, + expires: Date.now() * 2 + Math.floor(Math.random() * 100), + }, + { + data: { + statusCode: 200, + statusMessage: 'to be purged', + rawHeaders: rawHeaders1, + rawTrailers: ['Hello'], + body: ['asd4'], + }, + vary: [ + ['Accept', 'application/json'], + ['User-Agent', 'Mozilla/5.0'], + ['host2', 'www.google.com'], + ['origin2', 'www.google.com/images'], + ], + size: 100, + expires: Date.now(), + }, + ] + return entries +} + +function dbsetup() { + const cache = new CacheStore(':memory:') + return cache +} + +function seedCache(cache) { + for (const entry of exampleEntries()) { + cache.set('GET:/', entry) + } + return exampleEntries().length +} + +test('If no matching entry found, store the response in cache. Else return a matching entry.', (t) => { + t.plan(4) + const server = createServer((req, res) => { + res.writeHead(307, { + Vary: 'Origin2, User-Agent, Accept', + 'Cache-Control': 'public, immutable', + 'Content-Length': 4, + 'Content-Type': 'text/html', + Connection: 'close', + Location: 'http://www.google.com/', + datenow: Date.now(), + }) + res.end('foob') + }) + + t.teardown(server.close.bind(server)) + + const cache = dbsetup() + + server.listen(0, async () => { + const serverPort = server.address().port + // response not found in cache, response should be added to cache. + const response1 = await undici.request(`http://0.0.0.0:${serverPort}`, { + dispatcher: new undici.Agent().compose(interceptors.cache()), + cache, + }) + let str = '' + for await (const chunk of response1.body) { + str += chunk + } + const cacheLength1 = cache.get('GET:/').length + const added = seedCache(cache) - 1 // one is purged quickly due to ttl + + // should return the default server response + t.equal(str, 'foob') + + t.equal(cacheLength1, 1) + + // response found in cache, return cached response. + const response2 = await undici.request(`http://0.0.0.0:${serverPort}`, { + dispatcher: new undici.Agent().compose(interceptors.cache()), + headers: { + Accept: 'application/txt', + 'User-Agent': 'Chrome', + origin2: 'www.google.com/images', + }, + cache, + }) + + const cacheLength2 = cache.get('GET:/').length + + // should return the same response + t.equal(response1.datenow, response2.datenow) + + // cache should still have the same number of entries before + // and after a cached entry was used as a response. + t.equal(cacheLength2, cacheLength1 + added) + + cache.close() + }) +}) + +test('Responses with header Vary: * should not be cached', (t) => { + t.plan(2) + const server = createServer((req, res) => { + res.writeHead(307, { + Vary: '*', + 'Cache-Control': 'public, immutable', + 'Content-Length': 4, + 'Content-Type': 'text/html', + Connection: 'close', + Location: 'http://www.google.com/', + }) + res.end('foob') + }) + + t.teardown(server.close.bind(server)) + + const cache = dbsetup() + + const cacheLength1 = cache.get('GET:/').length + + server.listen(0, async () => { + const serverPort = server.address().port + // Response not found in cache, response should be added to cache. + // But the server returns Vary: *, and thus shouldn't be cached. + const response = await undici.request(`http://0.0.0.0:${serverPort}`, { + dispatcher: new undici.Agent().compose(interceptors.cache()), + cache: true, // use the CacheHandler's existing CacheStore created earlier with dbsetup(). + headers: { + Accept: 'application/txt', + 'User-Agent': 'Chrome', + origin2: 'response should not be cached', + }, + }) + let str = '' + for await (const chunk of response.body) { + str += chunk + } + const cacheLength2 = cache.get('GET:/').length + + // should return the default server response + t.equal(str, 'foob') + + t.equal(cacheLength2, cacheLength1) + + cache.close() + }) +}) + +test('307-Redirect Vary on Host, save to cache, fetch from cache', (t) => { + t.plan(3) const server = createServer((req, res) => { + res.writeHead(307, { + Vary: 'Host', + 'Cache-Control': 'public, immutable', + 'Content-Length': 3, + 'Content-Type': 'text/html', + Connection: 'keep-alive', + Location: 'http://www.blankwebsite.com/', + datenow: Date.now(), + }) res.end('asd') }) t.teardown(server.close.bind(server)) + + // entry not found, save to cache server.listen(0, async () => { - const { body } = await undici.request(`http://0.0.0.0:${server.address().port}`, { + const response1 = await undici.request(`http://0.0.0.0:${server.address().port}`, { + dispatcher: new undici.Agent().compose(interceptors.cache()), + cache: true, + }) + let str1 = '' + for await (const chunk of response1.body) { + str1 += chunk + } + + t.equal(str1, 'asd') + + // entry found, fetch from cache + const response2 = await undici.request(`http://0.0.0.0:${server.address().port}`, { dispatcher: new undici.Agent().compose(interceptors.cache()), cache: true, }) + let str2 = '' + for await (const chunk of response2.body) { + str2 += chunk + } + + t.equal(response1.headers.datenow, response2.headers.datenow) + t.equal(str2, 'asd') + }) +}) + +test('Cache purging based on its maxSize', (t) => { + t.plan(1) + const cache = new CacheStore(':memory:', { maxSize: 500 }) + + exampleEntries() + .concat(exampleEntries()) + .concat(exampleEntries()) + .concat(exampleEntries()) + .concat(exampleEntries()) + .concat(exampleEntries()) + .concat(exampleEntries()) // total size inserted: 2100 + .forEach((i) => cache.set('GET:/', i)) + + const rows = cache.get('GET:/') + const totalSize = rows.reduce((acc, r) => r.size + acc, 0) + + t.equal(totalSize, 500) +}) + +test('Cache #maxTTL overwriting ttl of individual entries', (t) => { + t.plan(1) + + const day = 1000 * 60 * 60 * 24 + const cache = new CacheStore(':memory:', { maxTTL: day }) + exampleEntries().forEach((i) => cache.set('GET:/', i)) + + const row = cache.get('GET:/')[0] + const rowExpires = Math.floor(row.expires / 1000) + const maxExpires = Math.floor((Date.now() + day) / 1000) + + t.equal(rowExpires, maxExpires) +}) + +test('200-OK, save to cache, fetch from cache', (t) => { + t.plan(4) + const server = createServer((req, res) => { + res.writeHead(200, { + Vary: 'User-Agent, Accept', + 'Cache-Control': 'public, immutable', + 'Content-Length': 4, + 'Content-Type': 'text/html', + Connection: 'close', + Location: 'http://www.google.com/', + datenow: Date.now(), + }) + res.end('foob') + }) + + t.teardown(server.close.bind(server)) + + const cache = dbsetup() + + server.listen(0, async () => { + const serverPort = server.address().port + // response not found in cache, response should be added to cache. + const response1 = await undici.request(`http://0.0.0.0:${serverPort}`, { + dispatcher: new undici.Agent().compose(interceptors.cache()), + cache, + }) let str = '' - for await (const chunk of body) { + for await (const chunk of response1.body) { str += chunk } - t.equal(str, 'asd') + + const cacheLength1 = cache.get('GET:/').length + + t.equal(cacheLength1, 1) + + // should return the default server response + t.equal(str, 'foob') + + const added = seedCache(cache) - 1 // (one is purged quickly due to ttl) + + // response found in cache, return cached response. + const response2 = await undici.request(`http://0.0.0.0:${serverPort}`, { + dispatcher: new undici.Agent().compose(interceptors.cache()), + headers: { + Accept: 'application/txt', + 'User-Agent': 'Chrome', + }, + cache, + }) + + const cacheLength2 = cache.get('GET:/').length + + // should return the response from the cached entry + t.equal(response2.datenow, response1.datenow) + + // cache should still have the same number of entries before + // and after a cached entry was used as a response. + t.equal(cacheLength2, added + cacheLength1) + + cache.close() }) })