Skip to content

Commit

Permalink
fix: cache fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 13, 2024
1 parent 51836bb commit 9b65862
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 135 deletions.
20 changes: 3 additions & 17 deletions lib/cache/memory-cache-store.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { Writable, Readable } = require('node:stream')
const { Writable } = require('node:stream')

/**
* @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore
Expand Down Expand Up @@ -81,23 +81,9 @@ class MemoryCacheStore {
return undefined
}

/**
* @type {Readable | undefined}
*/
let readable
if (value.body) {
readable = new Readable()

for (const chunk of value.body) {
readable.push(chunk)
}

readable.push(null)
}

return {
response: value.opts,
body: readable
body: value.body
}
}

Expand Down Expand Up @@ -242,7 +228,7 @@ class MemoryCacheStore {
/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key
*/
deleteByKey (key) {
delete (key) {
this.#data.delete(`${key.origin}:${key.path}`)
}

Expand Down
62 changes: 25 additions & 37 deletions lib/handler/cache-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class CacheHandler extends DecoratorHandler {
) {
// https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-response
try {
this.#store.deleteByKey(this.#cacheKey).catch?.(noop)
this.#store.delete(this.#cacheKey).catch?.(noop)
} catch {
// Fail silently
}
Expand Down Expand Up @@ -135,43 +135,31 @@ class CacheHandler extends DecoratorHandler {
cacheControlDirectives
)

if (this.#cacheKey.method === 'HEAD') {
this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})
} else {
this.#writeStream = this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})

if (this.#writeStream) {
const handler = this
this.#writeStream
.on('drain', resume)
.on('error', function () {
this.#writeStream = this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})

if (this.#writeStream) {
const handler = this
this.#writeStream
.on('drain', resume)
.on('error', function () {
// TODO (fix): Make error somehow observable?
})
.on('close', function () {
if (handler.#writeStream === this) {
handler.#writeStream = undefined
}

// TODO (fix): Should we resume even if was paused downstream?
resume()
})
}
})
.on('close', function () {
if (handler.#writeStream === this) {
handler.#writeStream = undefined
}

// TODO (fix): Should we resume even if was paused downstream?
resume()
})
}
}

Expand Down
117 changes: 59 additions & 58 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const assert = require('node:assert')
const { Readable } = require('node:stream')
const util = require('../core/util')
const CacheHandler = require('../handler/cache-handler')
const MemoryCacheStore = require('../cache/memory-cache-store')
Expand Down Expand Up @@ -57,82 +58,77 @@ module.exports = (opts = {}) => {
// Where body can be a Buffer, string, stream or blob?
const result = store.get(cacheKey)
if (!result) {
// Request isn't cached
return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
}

/**
* @param {import('node:stream').Readable | undefined} stream
* @param {import('node:stream').Readable} stream
* @param {import('../../types/cache-interceptor.d.ts').default.CachedResponse} value
*/
const respondWithCachedValue = (stream, value) => {
assert(!stream || !stream.destroyed, 'stream should not be destroyed')
assert(!stream || !stream.readableDidRead, 'stream should not be readableDidRead')
try {
stream
?.on('error', function (err) {
if (!this.readableEnded) {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
process.nextTick(() => {
throw err
})
}
}
})
.on('close', function () {
if (!this.errored && typeof handler.onComplete === 'function') {
handler.onComplete([])
const respondWithCachedValue = (stream, { cachedAt, rawHeaders, statusCode, statusMessage }) => {
assert(!stream.destroyed, 'stream should not be destroyed')
assert(!stream.readableDidRead, 'stream should not be readableDidRead')

stream
.on('error', function (err) {
if (!this.readableEnded) {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
throw err
}
})
}
})
.on('close', function () {
if (!this.errored && typeof handler.onComplete === 'function') {
handler.onComplete([])
}
})

if (typeof handler.onConnect === 'function') {
handler.onConnect((err) => {
stream?.destroy(err)
})
if (typeof handler.onConnect === 'function') {
handler.onConnect((err) => {
stream.destroy(err)
})

if (stream?.destroyed) {
return
}
if (stream.destroyed) {
return
}
}

if (typeof handler.onHeaders === 'function') {
// Add the age header
// https://www.rfc-editor.org/rfc/rfc9111.html#name-age
const age = Math.round((Date.now() - value.cachedAt) / 1000)
if (typeof handler.onHeaders === 'function') {
// Add the age header
// https://www.rfc-editor.org/rfc/rfc9111.html#name-age
const age = Math.round((Date.now() - cachedAt) / 1000)

// TODO (fix): What if rawHeaders already contains age header?
const rawHeaders = [...value.rawHeaders, AGE_HEADER, Buffer.from(`${age}`)]
// TODO (fix): What if rawHeaders already contains age header?
rawHeaders = [...rawHeaders, AGE_HEADER, Buffer.from(`${age}`)]

if (handler.onHeaders(value.statusCode, rawHeaders, () => stream?.resume(), value.statusMessage) === false) {
stream?.pause()
}
if (handler.onHeaders(statusCode, rawHeaders, () => stream?.resume(), statusMessage) === false) {
stream.pause()
}
}

if (opts.method === 'HEAD') {
if (typeof handler.onComplete === 'function') {
handler.onComplete([])
if (opts.method === 'HEAD') {
stream.destroy()
} else {
stream.on('data', function (chunk) {
if (typeof handler.onData === 'function' && !handler.onData(chunk)) {
stream.pause()
}

stream?.destroy()
} else {
stream.on('data', function (chunk) {
if (typeof handler.onData === 'function' && !handler.onData(chunk)) {
stream.pause()
}
})
}
} catch (err) {
stream?.destroy(err)
})
}
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
*/
const handleStream = (result) => {
const { response: value, body: stream } = result
const { response: value, body } = result

// TODO (perf): Readable.from path can be optimized...
const stream = util.isStream(body)
? body
: Readable.from(body ?? [])

if (!stream && opts.method !== 'HEAD') {
throw new Error('stream is undefined but method isn\'t HEAD')
Expand Down Expand Up @@ -177,12 +173,17 @@ module.exports = (opts = {}) => {
if (typeof result.then === 'function') {
result.then((result) => {
if (!result) {
// Request isn't cached
return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
} else {
handleStream(result)
}

handleStream(result)
}).catch(err => handler.onError(err))
}, err => {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
throw err
}
})
} else {
handleStream(result)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/util/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ function assertCacheStore (store, name = 'CacheStore') {
throw new TypeError(`expected type of ${name} to be a CacheStore, got ${store === null ? 'null' : typeof store}`)
}

for (const fn of ['get', 'createWriteStream', 'deleteByKey']) {
for (const fn of ['get', 'createWriteStream', 'delete']) {
if (typeof store[fn] !== 'function') {
throw new TypeError(`${name} needs to have a \`${fn}()\` function`)
}
Expand Down
9 changes: 6 additions & 3 deletions test/cache-interceptor/cache-stores.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const { describe, test } = require('node:test')
const { deepStrictEqual, notEqual, equal } = require('node:assert')
const { Readable } = require('node:stream')
const { once } = require('node:events')
const MemoryCacheStore = require('../../lib/cache/memory-cache-store')

Expand All @@ -17,7 +18,7 @@ function cacheStoreTests (CacheStore) {
equal(typeof store.isFull, 'boolean')
equal(typeof store.get, 'function')
equal(typeof store.createWriteStream, 'function')
equal(typeof store.deleteByKey, 'function')
equal(typeof store.delete, 'function')
})

// Checks that it can store & fetch different responses
Expand Down Expand Up @@ -268,9 +269,11 @@ function writeResponse (stream, body) {
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
* @returns {Promise<import('../../types/cache-interceptor.d.ts').default.GetResult | { body: Buffer[] }>}
*/
async function readResponse ({ response, body: stream }) {
async function readResponse ({ response, body: src }) {
notEqual(response, undefined)
notEqual(stream, undefined)
notEqual(src, undefined)

const stream = Readable.from(src)

/**
* @type {Buffer[]}
Expand Down
20 changes: 10 additions & 10 deletions test/interceptors/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,21 @@ describe('Cache Interceptor', () => {
})
})

test('unsafe methods call the store\'s deleteByKey function', async () => {
test('unsafe methods call the store\'s delete function', async () => {
const server = createServer((_, res) => {
res.end('asd')
}).listen(0)

after(() => server.close())
await once(server, 'listening')

let deleteByKeyCalled = false
let deleteCalled = false
const store = new cacheStores.MemoryCacheStore()

const originalDeleteByKey = store.deleteByKey.bind(store)
store.deleteByKey = (key) => {
deleteByKeyCalled = true
originalDeleteByKey(key)
const originaldelete = store.delete.bind(store)
store.delete = (key) => {
deleteCalled = true
originaldelete(key)
}

const client = new Client(`http://localhost:${server.address().port}`)
Expand All @@ -281,7 +281,7 @@ describe('Cache Interceptor', () => {
path: '/'
})

equal(deleteByKeyCalled, false)
equal(deleteCalled, false)

// Make sure other safe methods that we don't want to cache don't cause a cache purge
await client.request({
Expand All @@ -290,19 +290,19 @@ describe('Cache Interceptor', () => {
path: '/'
})

strictEqual(deleteByKeyCalled, false)
strictEqual(deleteCalled, false)

// Make sure the common unsafe methods cause cache purges
for (const method of ['POST', 'PUT', 'PATCH', 'DELETE']) {
deleteByKeyCalled = false
deleteCalled = false

await client.request({
origin: 'localhost',
method,
path: '/'
})

equal(deleteByKeyCalled, true, method)
equal(deleteCalled, true, method)
}
})

Expand Down
2 changes: 1 addition & 1 deletion test/types/cache-interceptor.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const store: CacheInterceptor.CacheStore = {
throw new Error('stub')
},

deleteByKey (_: CacheInterceptor.CacheKey): void | Promise<void> {
delete (_: CacheInterceptor.CacheKey): void | Promise<void> {
throw new Error('stub')
}
}
Expand Down
Loading

0 comments on commit 9b65862

Please sign in to comment.