Skip to content

Commit

Permalink
feat(cache): added the built-in nodejs/sqlite as the in-memory cache.
Browse files Browse the repository at this point in the history
  • Loading branch information
IsakT committed Aug 13, 2024
1 parent 3d40ba2 commit 930bdb2
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 58 deletions.
105 changes: 80 additions & 25 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
import assert from 'node:assert'
// import { LRUCache } from 'lru-cache'
import { DecoratorHandler, parseHeaders, parseCacheControl } from '../utils.js'
import { DatabaseSync } from 'node:sqlite' // --experimental-sqlite

class CacheHandler extends DecoratorHandler {
#handler
#store
#key
#opts
#value = null

constructor({ key, handler, store }) {
constructor({ key, handler, store, opts = [] }) {
super(handler)

this.#key = key
this.#handler = handler
this.#store = store
this.#opts = opts
}

onConnect(abort) {
Expand Down Expand Up @@ -64,11 +66,10 @@ class CacheHandler extends DecoratorHandler {
(rawHeaders?.reduce((xs, x) => xs + x.length, 0) ?? 0) +
(statusMessage?.length ?? 0) +
64,
ttl: ttl * 1e3,
ttl, // in ms!
}
}
}

return this.#handler.onHeaders(statusCode, rawHeaders, resume, statusMessage, headers)
}

Expand All @@ -88,20 +89,16 @@ class CacheHandler extends DecoratorHandler {

onComplete(rawTrailers) {
if (this.#value) {
// get
const entries = this.#handler.entries
const resHeaders = parseHeaders(this.#value.data.rawHeaders)
const reqHeaders = this.#handler.opts
const reqHeaders = this.#opts

// set
this.#value.data.rawTrailers = rawTrailers
this.#value.size = this.#value.size
? this.#value.size + rawTrailers?.reduce((xs, x) => xs + x.length, 0)
: 0
this.#value.vary = formatVaryData(resHeaders, reqHeaders)
entries.push(this.#value)
sortEntriesByVary(entries)
this.#store.set(this.#key, entries)

this.#store.set(this.#key, this.#value)
}
return this.#handler.onComplete(rawTrailers)
}
Expand All @@ -115,20 +112,72 @@ function formatVaryData(resHeaders, reqHeaders) {
.filter(([_key, val]) => val)
}

// TODO (fix): Async filesystem cache.
// Can we move this class somewhere else, to the util.js file or its own module?
class CacheStore {
constructor({ maxSize = 1024 * 1024, maxEntrySize = 128 * 1024 }) {
this.maxSize = maxSize
this.maxEntrySize = maxEntrySize
this.cache = new Map()
constructor() {
this.database = null
this.init()
}

set(key, value, opts) {
this.cache.set(key, value)
init() {
this.database = new DatabaseSync('file:memdb1?mode=memory&cache=shared')

this.database.exec(`
CREATE TABLE IF NOT EXISTS cacheInterceptor(
key TEXT,
data TEXT,
vary TEXT,
size INTEGER,
ttl INTEGER,
insertTime INTEGER
) STRICT
`)
}

set(key, entry, opts) {
if (!this.database) {
throw new Error('Database not initialized')
}

entry.data = JSON.stringify(entry.data)
entry.vary = JSON.stringify(entry.vary)

const insert = this.database.prepare(
'INSERT INTO cacheInterceptor (key, data, vary, size, ttl, insertTime) VALUES (?, ?, ?, ?, ?, ?)',
)

insert.run(key, entry.data, entry.vary, entry.size, entry.ttl, Date.now())

this.purge()
}

get(key) {
return this.cache.get(key)
if (!this.database) {
throw new Error('Database not initialized')
}
this.purge()
const query = this.database.prepare('SELECT * FROM cacheInterceptor WHERE key = ?')
const rows = query.all(key)
rows.map((i) => {
i.data = JSON.parse(i.data)
i.vary = JSON.parse(i.vary)
return i
})

// Just in case purge hasn't finished
const nonExpiredRows = rows.filter((i) => i.insertTime + i.ttl > Date.now())

return nonExpiredRows
}

purge() {
const query = this.database.prepare('DELETE FROM cacheInterceptor WHERE insertTime + ttl < ?')
query.run(Date.now())
}

deleteAll() {
const query = this.database.prepare('DELETE FROM cacheInterceptor')
query.run()
}
}

Expand All @@ -151,12 +200,12 @@ function findEntryByHeaders(entries, reqHeaders) {
return entries?.find(
(entry) =>
entry.vary?.every(([key, val]) => {
return reqHeaders[key] === val
return reqHeaders?.headers[key] === val
}) ?? true,
)
}

const DEFAULT_CACHE_STORE = new CacheStore({ maxSize: 128 * 1024, maxEntrySize: 1024 })
const DEFAULT_CACHE_STORE = new CacheStore()

export default (opts) => (dispatch) => (opts, handler) => {
if (!opts.cache || opts.upgrade) {
Expand Down Expand Up @@ -189,14 +238,19 @@ export default (opts) => (dispatch) => (opts, handler) => {

opts.host = opts.host ?? new URL(opts.origin).host

if (!opts.headers) {
opts.headers = {}
}

// idea: use DEFAULT_CACHE_STORE by default if 'cache' not specified, since the cache interceptor was already specified to be used.
const store = opts.cache === true ? DEFAULT_CACHE_STORE : opts.cache

if (!store) {
throw new Error(`Cache store not provided.`)
}

let key = `${opts.method}:${opts.path}`
console.log('getting key: ' + key)

let entries = store.get(key)

if (Array.isArray(entries) && entries.length === 0 && opts.method === 'HEAD') {
Expand All @@ -219,11 +273,14 @@ export default (opts) => (dispatch) => (opts, handler) => {
try {
handler.onConnect(abort)
signal.throwIfAborted()

handler.onHeaders(statusCode, rawHeaders, resume, statusMessage)
signal.throwIfAborted()

if (opts.method !== 'HEAD') {
for (const chunk of body) {
const ret = handler.onData(chunk)

signal.throwIfAborted()
if (ret === false) {
// TODO (fix): back pressure...
Expand All @@ -239,8 +296,6 @@ export default (opts) => (dispatch) => (opts, handler) => {

return true
} else {
handler.opts = opts
handler.entries = entries
return dispatch(opts, new CacheHandler({ handler, store, key }))
return dispatch(opts, new CacheHandler({ handler, store, key, opts }))
}
}
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"cache-control-parser": "^2.0.6",
"cacheable-lookup": "^7.0.0",
"http-errors": "^2.0.0",
"lru-cache": "^11.0.0",
"undici": "^6.19.5"
},
"devDependencies": {
Expand Down
Loading

0 comments on commit 930bdb2

Please sign in to comment.