Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cache): use vary headers to compare cached response with request headers #3

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ db
.nyc*
yarn.lock
.tap
file:*
194 changes: 162 additions & 32 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import assert from 'node:assert'
import { LRUCache } from 'lru-cache'
import { DecoratorHandler, parseHeaders, parseCacheControl } from '../utils.js'
import Database from 'better-sqlite3'

import * as BJSON from 'buffer-json'

class CacheHandler extends DecoratorHandler {
#handler
#store
#key
#opts
#value = null

constructor({ key, handler, store }) {
constructor({ key, handler, store, opts = [] }) {
super(handler)

this.#key = key
this.#handler = handler
this.#store = store
this.#opts = opts
}

onConnect(abort) {
Expand All @@ -23,7 +27,7 @@ class CacheHandler extends DecoratorHandler {
}

onHeaders(statusCode, rawHeaders, resume, statusMessage, headers = parseHeaders(rawHeaders)) {
if (statusCode !== 307) {
if (statusCode !== 307 && statusCode !== 200) {
return this.#handler.onHeaders(statusCode, rawHeaders, resume, statusMessage, headers)
}

Expand Down Expand Up @@ -64,18 +68,16 @@ class CacheHandler extends DecoratorHandler {
(rawHeaders?.reduce((xs, x) => xs + x.length, 0) ?? 0) +
(statusMessage?.length ?? 0) +
64,
ttl: ttl * 1e3,
expires: Date.now() + ttl, // in ms!
}
}
}

return this.#handler.onHeaders(statusCode, rawHeaders, resume, statusMessage, headers)
}

onData(chunk) {
if (this.#value) {
this.#value.size += chunk.bodyLength

const maxEntrySize = this.#store.maxEntrySize ?? Infinity
if (this.#value.size > maxEntrySize) {
this.#value = null
Expand All @@ -88,37 +90,147 @@ class CacheHandler extends DecoratorHandler {

onComplete(rawTrailers) {
if (this.#value) {
const resHeaders = parseHeaders(this.#value.data.rawHeaders)

// Early return if Vary = *, uncacheable.
if (resHeaders.vary === '*') {
return this.#handler.onComplete(rawTrailers)
}

const reqHeaders = this.#opts

this.#value.data.rawTrailers = rawTrailers
this.#value.size += rawTrailers?.reduce((xs, x) => xs + x.length, 0) ?? 0
this.#store.set(this.#key, this.#value.data, { ttl: this.#value.ttl, size: this.#value.size })
this.#value.size = this.#value.size
? this.#value.size + rawTrailers?.reduce((xs, x) => xs + x.length, 0)
: 0

this.#value.vary = formatVaryData(resHeaders, reqHeaders)
IsakT marked this conversation as resolved.
Show resolved Hide resolved

this.#store.set(this.#key, this.#value)
}
return this.#handler.onComplete(rawTrailers)
}
}

// TODO (fix): Async filesystem cache.
class CacheStore {
constructor({ maxSize = 1024 * 1024, maxEntrySize = 128 * 1024 }) {
this.maxSize = maxSize
this.maxEntrySize = maxEntrySize
this.cache = new LRUCache({ maxSize })
function formatVaryData(resHeaders, reqHeaders) {
return resHeaders.vary
?.split(',')
.map((key) => key.trim().toLowerCase())
.map((key) => [key, reqHeaders[key] ?? reqHeaders.headers[key]])
.filter(([_key, val]) => val)
}

export class CacheStore {
connected = false
#insertquery
#getQuery
#purgeExpiredQuery
#purgeOldestQuery
#database
#sizeQuery
#size = 0
#maxSize = 128e9
#maxTTL = Infinity

constructor(location = ':memory:', opts = {}) {
this.#maxSize = opts.maxSize ?? this.#maxSize
this.#maxTTL = opts.maxTTL ?? this.#maxTTL

this.#database = new Database(location)

this.#database.exec(`
CREATE TABLE IF NOT EXISTS cacheInterceptor(
id INTEGER PRIMARY KEY AUTOINCREMENT,
key TEXT,
data TEXT,
vary TEXT,
size INTEGER,
expires INTEGER
) STRICT
`)

this.#insertquery = this.#database.prepare(
'INSERT INTO cacheInterceptor (key, data, vary, size, expires) VALUES (?, ?, ?, ?, ?)',
)
this.#getQuery = this.#database.prepare(
'SELECT * FROM cacheInterceptor WHERE key = ? AND expires > ?',
)
this.#purgeExpiredQuery = this.#database.prepare(
'DELETE FROM cacheInterceptor WHERE expires < ?',
)
this.#sizeQuery = this.#database.prepare('SELECT SUM(size) FROM cacheInterceptor')
this.#purgeOldestQuery = this.#database.prepare(`
DELETE FROM cacheInterceptor
WHERE id IN (
SELECT id
FROM cacheInterceptor
ORDER BY expires ASC
LIMIT (SELECT CEILING(COUNT(*) * 0.10) FROM cacheInterceptor)
);
`)

this.connected = true
}

set(key, value, opts) {
this.cache.set(key, value, opts)
set(key, { data, vary, size, expires }) {
if (expires < Date.now()) {
return
}
expires = Math.min(expires, Date.now() + this.#maxTTL)

this.#insertquery.run(key, BJSON.stringify(data), BJSON.stringify(vary), size, expires)

this.#size += size
this.#maybePurge()
}

get(key) {
return this.cache.get(key)
const rows = this.#getQuery.all(key, Date.now())

return rows.map((row) => {
const { data, vary, size, expires } = row
return {
data: BJSON.parse(data),
vary: JSON.parse(vary),
size: parseInt(size),
expires: parseInt(expires),
}
})
}

close() {
this.#database.close()
this.connected = false
}

#maybePurge() {
if (this.#size == null || this.#size > this.#maxSize) {
this.#purgeExpiredQuery.run(Date.now())
this.#size = this.#sizeQuery.get()['SUM(size)']

// In the case where the cache is full but has no expired entries yet, delete 10% of the cache, ordered by
// the oldest entries according to the 'expires' column.
if (this.#size > this.#maxSize) {
this.#purgeOldestQuery.run()
this.#size = this.#sizeQuery.get()['SUM(size)']
}
IsakT marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

function makeKey(opts) {
// NOTE: Ignores headers...
return `${opts.origin}:${opts.method}:${opts.path}`
function findEntryByHeaders(entries, request) {
const foundEntry = entries?.find(
(entry) =>
entry.vary?.every(([key, val]) => {
return request?.headers[key] === val || request[key] === val
}) ?? true,
ronag marked this conversation as resolved.
Show resolved Hide resolved
)

// if no exact match was found, take the latest added entry
return foundEntry ?? entries[0]
}

const DEFAULT_CACHE_STORE = new CacheStore({ maxSize: 128 * 1024, maxEntrySize: 1024 })
let cacheInstance = null

export default (opts) => (dispatch) => (opts, handler) => {
if (!opts.cache || opts.upgrade) {
Expand Down Expand Up @@ -148,23 +260,38 @@ export default (opts) => (dispatch) => (opts, handler) => {

// Dump body...
opts.body?.on('error', () => {}).resume()
opts.host = opts.host ?? new URL(opts.origin).host

const store = opts.cache === true ? DEFAULT_CACHE_STORE : opts.cache
if (!opts.headers) {
opts.headers = {}
}

if (!store) {
throw new Error(`Cache store not provided.`)
// Supported opts.cache values: [true, false, 'clear', custom cache]
// create new cache instance if none exists
if (opts.cache === 'clear' || (!cacheInstance?.connected && opts.cache === true)) {
cacheInstance = new CacheStore()
}

let key = makeKey(opts)
let value = store.get(key)
// or use provided cache instead
if (typeof opts.cache === 'object') {
cacheInstance = opts.cache
}

if (value == null && opts.method === 'HEAD') {
key = makeKey({ ...opts, method: 'GET' })
value = store.get(key)
if (!cacheInstance) {
throw new Error(`Cache store not provided.`)
}

if (value) {
const { statusCode, statusMessage, rawHeaders, rawTrailers, body } = value
const key = `${opts.method}:${opts.path}`

const entries =
(cacheInstance.get(key) ?? opts.method === 'HEAD')
? cacheInstance.get(`GET:${opts.path}`)
: null

const entry = findEntryByHeaders(entries, opts)

if (entry) {
const { statusCode, statusMessage, rawHeaders, rawTrailers, body } = entry.data
const ac = new AbortController()
const signal = ac.signal

Expand All @@ -176,11 +303,14 @@ export default (opts) => (dispatch) => (opts, handler) => {
try {
handler.onConnect(abort)
signal.throwIfAborted()

handler.onHeaders(statusCode, rawHeaders, resume, statusMessage)
signal.throwIfAborted()

if (opts.method !== 'HEAD') {
for (const chunk of body) {
const ret = handler.onData(chunk)

signal.throwIfAborted()
if (ret === false) {
// TODO (fix): back pressure...
Expand All @@ -196,6 +326,6 @@ export default (opts) => (dispatch) => (opts, handler) => {

return true
} else {
return dispatch(opts, new CacheHandler({ handler, store, key: makeKey(opts) }))
return dispatch(opts, new CacheHandler({ handler, store: cacheInstance, key, opts }))
}
}
3 changes: 3 additions & 0 deletions lib/interceptor/proxy.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Handler extends DecoratorHandler {
}

onUpgrade(statusCode, rawHeaders, socket) {
console.log('Proxy onUpgrade')
return this.#handler.onUpgrade(
statusCode,
reduceHeaders(
Expand All @@ -34,6 +35,7 @@ class Handler extends DecoratorHandler {
}

onHeaders(statusCode, rawHeaders, resume, statusMessage) {
console.log('Proxy onHeaders')
return this.#handler.onHeaders(
statusCode,
reduceHeaders(
Expand Down Expand Up @@ -164,6 +166,7 @@ function printIp(address, port) {
}

export default (opts) => (dispatch) => (opts, handler) => {
console.log('Proxy default dispatch')
if (!opts.proxy) {
return dispatch(opts, handler)
}
Expand Down
3 changes: 3 additions & 0 deletions lib/interceptor/redirect.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Handler extends DecoratorHandler {
}

onConnect(abort) {
console.log('Redirect onConnect')
if (this.#aborted) {
abort(this.#reason)
} else {
Expand All @@ -48,6 +49,7 @@ class Handler extends DecoratorHandler {
}

onHeaders(statusCode, rawHeaders, resume, statusText, headers = parseHeaders(rawHeaders)) {
console.log('Redirect onHeaders')
if (redirectableStatusCodes.indexOf(statusCode) === -1) {
assert(!this.#headersSent)
this.#headersSent = true
Expand Down Expand Up @@ -109,6 +111,7 @@ class Handler extends DecoratorHandler {
}

onData(chunk) {
console.log('Redirect onData')
if (this.#location) {
/*
https://tools.ietf.org/html/rfc7231#section-6.4
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
"lib/*"
],
"dependencies": {
"better-sqlite3": "^11.2.1",
"buffer-json": "^2.0.0",
"cache-control-parser": "^2.0.6",
"cacheable-lookup": "^7.0.0",
"http-errors": "^2.0.0",
"lru-cache": "^11.0.0",
"undici": "^6.19.5"
},
"devDependencies": {
Expand Down
Loading