Skip to content

Commit

Permalink
fix: memory store (#3834)
Browse files Browse the repository at this point in the history
* fix: memory store

Simplify and fix memory leak

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup

* fixup
  • Loading branch information
ronag authored Nov 18, 2024
1 parent 1779440 commit 70f7314
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 313 deletions.
314 changes: 84 additions & 230 deletions lib/cache/memory-cache-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,23 @@
const { Writable } = require('node:stream')

/**
* @typedef {import('../../types/cache-interceptor.d.ts').default.CacheKey} CacheKey
* @typedef {import('../../types/cache-interceptor.d.ts').default.CacheValue} CacheValue
* @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore
* @typedef {import('../../types/cache-interceptor.d.ts').default.GetResult} GetResult
*/

/**
* @implements {CacheStore}
*
* @typedef {{
* locked: boolean
* opts: import('../../types/cache-interceptor.d.ts').default.CachedResponse
* body?: Buffer[]
* }} MemoryStoreValue
*/
class MemoryCacheStore {
#maxCount = Infinity

#maxSize = Infinity
#maxEntrySize = Infinity

#entryCount = 0

/**
* @type {Map<string, Map<string, MemoryStoreValue[]>>}
*/
#data = new Map()
#size = 0
#count = 0
#entries = new Map()

/**
* @param {import('../../types/cache-interceptor.d.ts').default.MemoryCacheStoreOpts | undefined} [opts]
Expand All @@ -44,6 +41,17 @@ class MemoryCacheStore {
this.#maxCount = opts.maxCount
}

if (opts.maxSize !== undefined) {
if (
typeof opts.maxSize !== 'number' ||
!Number.isInteger(opts.maxSize) ||
opts.maxSize < 0
) {
throw new TypeError('MemoryCacheStore options.maxSize must be a non-negative integer')
}
this.#maxSize = opts.maxSize
}

if (opts.maxEntrySize !== undefined) {
if (
typeof opts.maxEntrySize !== 'number' ||
Expand All @@ -57,269 +65,115 @@ class MemoryCacheStore {
}
}

get isFull () {
return this.#entryCount >= this.#maxCount
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} req
* @returns {import('../../types/cache-interceptor.d.ts').default.GetResult | undefined}
*/
get (key) {
if (typeof key !== 'object') {
throw new TypeError(`expected key to be object, got ${typeof key}`)
}

const values = this.#getValuesForRequest(key, false)
if (!values) {
return undefined
}

const value = this.#findValue(key, values)

if (!value || value.locked) {
return undefined
}
const topLevelKey = `${key.origin}:${key.path}`

return { ...value.opts, body: value.body }
const now = Date.now()
const entry = this.#entries.get(topLevelKey)?.find((entry) => (
entry.deleteAt > now &&
entry.method === key.method &&
(entry.vary == null || Object.keys(entry.vary).every(headerName => entry.vary[headerName] === key.headers?.[headerName]))
))

return entry == null
? undefined
: {
statusMessage: entry.statusMessage,
statusCode: entry.statusCode,
rawHeaders: entry.rawHeaders,
body: entry.body,
cachedAt: entry.cachedAt,
staleAt: entry.staleAt,
deleteAt: entry.deleteAt
}
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key
* @param {import('../../types/cache-interceptor.d.ts').default.CachedResponse} opts
* @param {import('../../types/cache-interceptor.d.ts').default.CacheValue} val
* @returns {Writable | undefined}
*/
createWriteStream (key, opts) {
createWriteStream (key, val) {
if (typeof key !== 'object') {
throw new TypeError(`expected key to be object, got ${typeof key}`)
}
if (typeof opts !== 'object') {
throw new TypeError(`expected value to be object, got ${typeof opts}`)
}

if (this.isFull) {
return undefined
if (typeof val !== 'object') {
throw new TypeError(`expected value to be object, got ${typeof val}`)
}

const values = this.#getValuesForRequest(key, true)

/**
* @type {(MemoryStoreValue & { index: number }) | undefined}
*/
let value = this.#findValue(key, values)
let valueIndex = value?.index
if (!value) {
// The value doesn't already exist, meaning we haven't cached this
// response before. Let's assign it a value and insert it into our data
// property.

if (this.isFull) {
// Or not, we don't have space to add another response
return undefined
}

this.#entryCount++

value = {
locked: true,
opts
}

// We want to sort our responses in decending order by their deleteAt
// timestamps so that deleting expired responses is faster
if (
values.length === 0 ||
opts.deleteAt < values[values.length - 1].deleteAt
) {
// Our value is either the only response for this path or our deleteAt
// time is sooner than all the other responses
values.push(value)
valueIndex = values.length - 1
} else if (opts.deleteAt >= values[0].deleteAt) {
// Our deleteAt is later than everyone elses
values.unshift(value)
valueIndex = 0
} else {
// We're neither in the front or the end, let's just binary search to
// find our stop we need to be in
let startIndex = 0
let endIndex = values.length
while (true) {
if (startIndex === endIndex) {
values.splice(startIndex, 0, value)
break
}

const middleIndex = Math.floor((startIndex + endIndex) / 2)
const middleValue = values[middleIndex]
if (opts.deleteAt === middleIndex) {
values.splice(middleIndex, 0, value)
valueIndex = middleIndex
break
} else if (opts.deleteAt > middleValue.opts.deleteAt) {
endIndex = middleIndex
continue
} else {
startIndex = middleIndex
continue
}
}
}
} else {
// Check if there's already another request writing to the value or
// a request reading from it
if (value.locked) {
return undefined
}

// Empty it so we can overwrite it
value.body = []
}
const topLevelKey = `${key.origin}:${key.path}`

let currentSize = 0
/**
* @type {Buffer[] | null}
*/
let body = key.method !== 'HEAD' ? [] : null
const maxEntrySize = this.#maxEntrySize
const store = this
const entry = { ...key, ...val, body: [], size: 0 }

const writable = new Writable({
return new Writable({
write (chunk, encoding, callback) {
if (key.method === 'HEAD') {
throw new Error('HEAD request shouldn\'t have a body')
}

if (!body) {
return callback()
}

if (typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding)
}

currentSize += chunk.byteLength
entry.size += chunk.byteLength

if (currentSize >= maxEntrySize) {
body = null
this.end()
shiftAtIndex(values, valueIndex)
return callback()
if (entry.size >= store.#maxEntrySize) {
this.destroy()
} else {
entry.body.push(chunk)
}

body.push(chunk)
callback()
callback(null)
},
final (callback) {
value.locked = false
if (body !== null) {
value.body = body
let entries = store.#entries.get(topLevelKey)
if (!entries) {
entries = []
store.#entries.set(topLevelKey, entries)
}
entries.push(entry)

store.#size += entry.size
store.#count += 1

if (store.#size > store.#maxSize || store.#count > store.#maxCount) {
for (const [key, entries] of store.#entries) {
for (const entry of entries.splice(0, entries.length / 2)) {
store.#size -= entry.size
store.#count -= 1
}
if (entries.length === 0) {
store.#entries.delete(key)
}
}
}

callback()
callback(null)
}
})

return writable
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key
* @param {CacheKey} key
*/
delete (key) {
this.#data.delete(`${key.origin}:${key.path}`)
}

/**
* Gets all of the requests of the same origin, path, and method. Does not
* take the `vary` property into account.
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key
* @param {boolean} [makeIfDoesntExist=false]
* @returns {MemoryStoreValue[] | undefined}
*/
#getValuesForRequest (key, makeIfDoesntExist) {
// https://www.rfc-editor.org/rfc/rfc9111.html#section-2-3
const topLevelKey = `${key.origin}:${key.path}`
let cachedPaths = this.#data.get(topLevelKey)
if (!cachedPaths) {
if (!makeIfDoesntExist) {
return undefined
}

cachedPaths = new Map()
this.#data.set(topLevelKey, cachedPaths)
}

let value = cachedPaths.get(key.method)
if (!value && makeIfDoesntExist) {
value = []
cachedPaths.set(key.method, value)
if (typeof key !== 'object') {
throw new TypeError(`expected key to be object, got ${typeof key}`)
}

return value
}

/**
* Given a list of values of a certain request, this decides the best value
* to respond with.
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} req
* @param {MemoryStoreValue[]} values
* @returns {(MemoryStoreValue & { index: number }) | undefined}
*/
#findValue (req, values) {
/**
* @type {MemoryStoreValue | undefined}
*/
let value
const now = Date.now()
for (let i = values.length - 1; i >= 0; i--) {
const current = values[i]
const currentCacheValue = current.opts
if (now >= currentCacheValue.deleteAt) {
// We've reached expired values, let's delete them
this.#entryCount -= values.length - i
values.length = i
break
}

let matches = true

if (currentCacheValue.vary) {
if (!req.headers) {
matches = false
break
}

for (const key in currentCacheValue.vary) {
if (currentCacheValue.vary[key] !== req.headers[key]) {
matches = false
break
}
}
}
const topLevelKey = `${key.origin}:${key.path}`

if (matches) {
value = {
...current,
index: i
}
break
}
for (const entry of this.#entries.get(topLevelKey) ?? []) {
this.#size -= entry.size
this.#count -= 1
}

return value
}
}

/**
* @param {any[]} array Array to modify
* @param {number} idx Index to delete
*/
function shiftAtIndex (array, idx) {
for (let i = idx + 1; idx < array.length; i++) {
array[i - 1] = array[i]
this.#entries.delete(topLevelKey)
}

array.length--
}

module.exports = MemoryCacheStore
Loading

0 comments on commit 70f7314

Please sign in to comment.