diff --git a/src/bindings.d.ts b/src/bindings.d.ts index ebc7083..09411f3 100644 --- a/src/bindings.d.ts +++ b/src/bindings.d.ts @@ -1,16 +1,26 @@ import { CID } from '@web3-storage/gateway-lib/handlers' import { Environment as RateLimiterEnvironment } from './middleware/withRateLimit.types.ts' +import { Environment as CarParkFetchEnvironment } from './middleware/withCarParkFetch.types.ts' import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHandler.types.ts' -import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts' +import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withContentClaimsDagula.types.ts' import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts' +import { Environment as EgressClientEnvironment } from './middleware/withEgressClient.types.ts' +import { Environment as GatewayIdentityEnvironment } from './middleware/withGatewayIdentity.types.ts' +import { Environment as DelegationsStorageEnvironment } from './middleware/withDelegationsStorage.types.ts' +import { Environment as LocatorEnvironment } from './middleware/withLocator.types.ts' import { UnknownLink } from 'multiformats' import { DIDKey } from '@ucanto/principal/ed25519' export interface Environment - extends CarBlockEnvironment, - RateLimiterEnvironment, - ContentClaimsDagulaEnvironment, - EgressTrackerEnvironment { + extends RateLimiterEnvironment, + CarBlockEnvironment, + CarParkFetchEnvironment, + ContentClaimsDagulaEnvironment, + EgressClientEnvironment, + EgressTrackerEnvironment, + GatewayIdentityEnvironment, + DelegationsStorageEnvironment, + LocatorEnvironment { VERSION: string CONTENT_CLAIMS_SERVICE_URL?: string HONEYCOMB_API_KEY: string diff --git a/src/middleware/withContentClaimsDagula.js b/src/middleware/withContentClaimsDagula.js index 4c93891..56a79b5 100644 --- a/src/middleware/withContentClaimsDagula.js +++ b/src/middleware/withContentClaimsDagula.js @@ -1,5 +1,7 @@ import { Dagula } from 'dagula' +import { base58btc } from 'multiformats/bases/base58' import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching' +import * as dagPb from '@ipld/dag-pb' /** * @import { @@ -16,6 +18,7 @@ import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching' /** * Creates a dagula instance backed by content claims. + * Get operations for DAG Protobuf content are cached if the DAGPB_CONTENT_CACHE is enabled. * * @type {( * Middleware< @@ -31,6 +34,10 @@ export function withContentClaimsDagula (handler) { const fetcher = BatchingFetcher.create(locator, ctx.fetch) const dagula = new Dagula({ async get (cid) { + const dagPbContent = await getDagPbContent(env, fetcher, cid, ctx) + if (dagPbContent) { + return dagPbContent + } const res = await fetcher.fetch(cid.multihash) return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined }, @@ -46,3 +53,85 @@ export function withContentClaimsDagula (handler) { return handler(request, env, { ...ctx, blocks: dagula, dag: dagula, unixfs: dagula }) } } + +/** + * Returns the cached DAG Protobuf bytes if they exist, otherwise fetches the DAG Protobuf bytes + * from the fetcher and caches them in the KV store. + * + * @param {Environment} env + * @param {import('@web3-storage/blob-fetcher').Fetcher} fetcher + * @param {import('multiformats').UnknownLink} cid + * @param {import('@web3-storage/gateway-lib').Context} ctx + * @returns {Promise<{ cid: import('multiformats').UnknownLink, bytes: Uint8Array } | undefined>} + */ +async function getDagPbContent (env, fetcher, cid, ctx) { + if (env.FF_DAGPB_CONTENT_CACHE_ENABLED === 'true' && cid.code === dagPb.code) { + const cachedBytes = await getCachedDagPbBytes(env, cid) + if (cachedBytes) { + return { cid, bytes: cachedBytes } + } + + const res = await fetcher.fetch(cid.multihash) + if (res.ok) { + const bytes = await res.ok.bytes() + const dagPbNode = dagPb.decode(bytes) + if (dagPbNode.Links && dagPbNode.Links.length === 0) { + // Old DAG PB nodes have no links ("raw" blocks as leaves), so we don't want to cache them + return { cid, bytes } + } + ctx.waitUntil(cacheDagPbBytes(env, cid, bytes)) + return { cid, bytes } + } + } + return undefined +} + +/** + * Caches the DAG Protobuf content into the KV store if the content size is less than or equal to the max size. + * The content is cached for the duration of the TTL (seconds), if the TTL is not set, the content is cached indefinitely. + * + * @param {Environment} env + * @param {import('multiformats').UnknownLink} cid + * @param {Uint8Array} bytes + * @returns {Promise} + */ +async function cacheDagPbBytes (env, cid, bytes) { + const maxSize = env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB ? parseInt(env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB) * 1024 * 1024 : undefined + if (maxSize && bytes.length <= maxSize) { + try { + const ttlSeconds = env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS ? parseInt(env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS) : 0 + const key = getDagPbKey(cid) + await env.DAGPB_CONTENT_CACHE.put(key, bytes, { + expirationTtl: ttlSeconds > 60 ? ttlSeconds : undefined + }) + } catch (/** @type {any} */ error) { + console.error(error) + } + } +} + +/** + * Returns the cached DAG Protobuf bytes if they exist, otherwise returns null. + * + * @param {Environment} env + * @param {import('multiformats').UnknownLink} cid + * @returns {Promise} + */ +async function getCachedDagPbBytes (env, cid) { + const key = getDagPbKey(cid) + const dagPbBytes = await env.DAGPB_CONTENT_CACHE.get(key, 'arrayBuffer') + if (dagPbBytes) { + return new Uint8Array(dagPbBytes) + } + return null +} + +/** + * Returns the base58btc encoded key for the DAG Protobuf content in the KV store. + * + * @param {import('multiformats').UnknownLink} cid + * @returns {string} + */ +function getDagPbKey (cid) { + return base58btc.encode(cid.multihash.bytes) +} diff --git a/src/middleware/withContentClaimsDagula.types.ts b/src/middleware/withContentClaimsDagula.types.ts index 4618fce..54a7d79 100644 --- a/src/middleware/withContentClaimsDagula.types.ts +++ b/src/middleware/withContentClaimsDagula.types.ts @@ -1,5 +1,24 @@ +import { KVNamespace } from '@cloudflare/workers-types' import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib' export interface Environment extends MiddlewareEnvironment { CONTENT_CLAIMS_SERVICE_URL?: string + /** + * The KV namespace that stores the DAGPB content cache. + */ + DAGPB_CONTENT_CACHE: KVNamespace + /** + * The number that represents when to expire the key-value pair in seconds from now. + * The minimum value is 60 seconds. Any value less than 60MB will not be used. + */ + FF_DAGPB_CONTENT_CACHE_TTL_SECONDS?: string + /** + * The maximum size of the key-value pair in MB. + * The minimum value is 1 MB. Any value less than 1MB will not be used. + */ + FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB?: string + /** + * The flag that enables the DAGPB content cache. + */ + FF_DAGPB_CONTENT_CACHE_ENABLED: string } \ No newline at end of file diff --git a/test/miniflare/freeway.spec.js b/test/miniflare/freeway.spec.js index ff8a7ea..cdccde1 100644 --- a/test/miniflare/freeway.spec.js +++ b/test/miniflare/freeway.spec.js @@ -1,7 +1,7 @@ import { describe, before, beforeEach, after, it } from 'node:test' import assert from 'node:assert' import { randomBytes } from 'node:crypto' -import { Miniflare } from 'miniflare' +import { Log, LogLevel, Miniflare } from 'miniflare' import * as Link from 'multiformats/link' import { sha256 } from 'multiformats/hashes/sha2' import * as raw from 'multiformats/codecs/raw' @@ -49,17 +49,26 @@ describe('freeway', () => { const { port } = server.address() url = new URL(`http://127.0.0.1:${port}`) miniflare = new Miniflare({ + host: '127.0.0.1', + port: 8787, + inspectorPort: 9898, + log: new Log(LogLevel.INFO), + cache: false, // Disable Worker Global Cache to test cache middlewares bindings: { CONTENT_CLAIMS_SERVICE_URL: claimsService.url.toString(), CARPARK_PUBLIC_BUCKET_URL: url.toString(), - GATEWAY_SERVICE_DID: 'did:example:gateway' + GATEWAY_SERVICE_DID: 'did:example:gateway', + DAGPB_CONTENT_CACHE: 'DAGPB_CONTENT_CACHE', + FF_DAGPB_CONTENT_CACHE_ENABLED: 'true', + FF_DAGPB_CONTENT_CACHE_TTL_SECONDS: 300, + FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB: 2 }, - inspectorPort: 9898, scriptPath: 'dist/worker.mjs', modules: true, compatibilityFlags: ['nodejs_compat'], compatibilityDate: '2024-09-23', - r2Buckets: ['CARPARK'] + r2Buckets: ['CARPARK'], + kvNamespaces: ['DAGPB_CONTENT_CACHE'] }) bucket = await miniflare.getR2Bucket('CARPARK') @@ -70,10 +79,15 @@ describe('freeway', () => { builder = new Builder(bucket) }) - beforeEach(() => { + beforeEach(async () => { claimsService.resetCallCount() claimsService.resetClaims() bucketService.resetCallCount() + const dagpbCache = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE') + const keys = await dagpbCache.list() + for (const key of keys.keys) { + await dagpbCache.delete(key.name) + } }) after(() => { @@ -481,4 +495,88 @@ describe('freeway', () => { } })) }) + + it('should be faster to get a file in a directory when the protobuf directory structure is cached', async () => { + // Generate 3 files wrapped in a folder, >2MB each to force a unixfs file header block (dag protobuf) + const input = [ + new File([randomBytes(2_050_550)], 'data.txt'), + new File([randomBytes(2_050_550)], 'image.png'), + new File([randomBytes(2_050_550)], 'image2.png') + ] + // Adding to the builder will generate the unixfs file header block + const { root, shards } = await builder.add(input) + assert.equal(root.code, 112, 'Root should be a protobuf directory code 112') + + // Generate claims for the shards + for (const shard of shards) { + const location = new URL(toBlobKey(shard.multihash), url) + const res = await fetch(location) + assert(res.body) + const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location) + claimsService.addClaims(claims) + } + + // Check that the cache is empty + const dagpb = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE') + const cachedContent1 = await dagpb.list() + assert.equal(cachedContent1.keys.length, 0, 'Cache should be empty') + + // First request adds the file to the cache, so it takes longer + const start = performance.now() + const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}/${input[2].name}`, { + headers: { + 'Cache-Control': 'no-cache' + } + }) + if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`) + const end = performance.now() + assertBlobEqual(input[2], await res.blob()) + + const cachedContent2 = await dagpb.list() + assert(cachedContent2.keys.length > 0, 'Cache should have one or more keys') + + // Second request retrieves the file from the cache, so it should take less time than the first request + const start2 = performance.now() + console.log('SECOND REQUEST') + const res2 = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}/${input[2].name}`, { + headers: { + 'Cache-Control': 'no-cache' + } + }) + if (!res2.ok) assert.fail(`unexpected response: ${await res2.text()}`) + const end2 = performance.now() + assertBlobEqual(input[2], await res2.blob()) + assert(end2 - start2 < end - start, 'Second request should take less time than the first request') + }) + + it('should not cache content if it is not dag protobuf content', async () => { + // Generate 1 file, >1MB each and do not wrap it in a folder + const input = new File([randomBytes(256)], 'data.txt') + const { root, shards } = await builder.add(input) + assert.equal(root.code, 85, 'Root should be a raw file code 85') + + // Generate claims for the shards + for (const shard of shards) { + const location = new URL(toBlobKey(shard.multihash), url) + const res = await fetch(location) + assert(res.body) + const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location) + claimsService.addClaims(claims) + } + + // Check that the cache is empty + const dagpb = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE') + const cachedContent = await dagpb.list() + assert.equal(cachedContent.keys.length, 0, 'Cache should be empty') + + // It should not add the file to the cache, because it is not dag protobuf content + const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}`, { + headers: { + 'Cache-Control': 'no-cache' + } + }) + if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`) + assertBlobEqual(input, await res.blob()) + assert.equal(cachedContent.keys.length, 0, 'Cache should be empty') + }) }) diff --git a/wrangler.toml b/wrangler.toml index f2fb5ae..89aa74e 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -42,7 +42,8 @@ r2_buckets = [ ] kv_namespaces = [ { binding = "AUTH_TOKEN_METADATA", id = "f848730e45d94f17bcaf3b6d0915da40" }, - { binding = "CONTENT_SERVE_DELEGATIONS_STORE", id = "b2984f16c21e4991a644683c00d80033" } + { binding = "CONTENT_SERVE_DELEGATIONS_STORE", id = "b2984f16c21e4991a644683c00d80033" }, + { binding = "DAGPB_CONTENT_CACHE", id = "e1339664614940a0804a3020ce3dae12" } ] [env.production.build] @@ -53,9 +54,13 @@ MAX_SHARDS = "825" FF_RATE_LIMITER_ENABLED = "false" FF_EGRESS_TRACKER_ENABLED = "true" FF_TELEMETRY_ENABLED = "true" -TELEMETRY_RATIO = 0.0005 FF_DELEGATIONS_STORAGE_ENABLED = "true" FF_RAMP_UP_PROBABILITY = "0" +# Cache for 30 days by default +FF_DAGPB_CONTENT_CACHE_TTL_SECONDS = 2_592_000 +FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB = 2 +FF_DAGPB_CONTENT_CACHE_ENABLED = "false" +TELEMETRY_RATIO = 0.0005 GATEWAY_SERVICE_DID = "did:web:w3s.link" UPLOAD_SERVICE_DID = "did:web:web3.storage" CONTENT_CLAIMS_SERVICE_URL = "https://claims.web3.storage" @@ -76,7 +81,8 @@ r2_buckets = [ ] kv_namespaces = [ { binding = "AUTH_TOKEN_METADATA", id = "b618bb05deb8493f944ef4a0f538030c" }, - { binding = "CONTENT_SERVE_DELEGATIONS_STORE", id = "99ae45f8b5b3478a9df09302c27e81a3" } + { binding = "CONTENT_SERVE_DELEGATIONS_STORE", id = "99ae45f8b5b3478a9df09302c27e81a3" }, + { binding = "DAGPB_CONTENT_CACHE", id = "c70a74363e7a4f06ad39fa3022aab7c7" } ] [env.staging.build] @@ -87,9 +93,12 @@ MAX_SHARDS = "825" FF_RATE_LIMITER_ENABLED = "false" FF_EGRESS_TRACKER_ENABLED = "true" FF_TELEMETRY_ENABLED = "true" -TELEMETRY_RATIO = 1.0 FF_DELEGATIONS_STORAGE_ENABLED = "true" FF_RAMP_UP_PROBABILITY = "0" +FF_DAGPB_CONTENT_CACHE_TTL_SECONDS = 300 +FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB = 2 +FF_DAGPB_CONTENT_CACHE_ENABLED = "true" +TELEMETRY_RATIO = 1.0 GATEWAY_SERVICE_DID = "did:web:staging.w3s.link" UPLOAD_SERVICE_DID = "did:web:staging.web3.storage" CONTENT_CLAIMS_SERVICE_URL = "https://staging.claims.web3.storage" @@ -109,6 +118,9 @@ r2_buckets = [ DEBUG = "true" FF_RATE_LIMITER_ENABLED = "false" FF_EGRESS_TRACKER_ENABLED = "false" +FF_DAGPB_CONTENT_CACHE_ENABLED = "true" +FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB = 2 +FF_DAGPB_CONTENT_CACHE_TTL_SECONDS = 300 FF_TELEMETRY_ENABLED = "true" TELEMETRY_RATIO = 1.0 FF_RAMP_UP_PROBABILITY = "100" @@ -144,12 +156,16 @@ account_id = "fffa4b4363a7e5250af8357087263b3a" # r2_buckets = [ # { binding = "CARPARK", bucket_name = "carpark-fforbeck-0", preview_bucket_name = "carpark-fforbeck-preview-0" } # ] +# r2_buckets = [ +# { binding = "CARPARK", bucket_name = "carpark-staging-0" } +# ] r2_buckets = [ - { binding = "CARPARK", bucket_name = "carpark-staging-0" } + { binding = "CARPARK", bucket_name = "carpark-prod-0", preview_bucket_name = "carpark-prod-0" } ] kv_namespaces = [ { binding = "AUTH_TOKEN_METADATA", id = "b618bb05deb8493f944ef4a0f538030c" }, - { binding = "CONTENT_SERVE_DELEGATIONS_STORE", id = "26cc47fec09749bb9ee42bc6407f9a9d" } + { binding = "CONTENT_SERVE_DELEGATIONS_STORE", id = "26cc47fec09749bb9ee42bc6407f9a9d" }, + { binding = "DAGPB_CONTENT_CACHE", id = "3f0c253b90fc48c1b384f1563ede54f9" } ] [env.fforbeck.vars] @@ -157,14 +173,25 @@ DEBUG = "true" FF_RATE_LIMITER_ENABLED = "false" FF_EGRESS_TRACKER_ENABLED = "true" FF_TELEMETRY_ENABLED = "true" -TELEMETRY_RATIO = 1.0 FF_DELEGATIONS_STORAGE_ENABLED = "true" -FF_RAMP_UP_PROBABILITY = "100" -GATEWAY_SERVICE_DID = "did:web:staging.w3s.link" -UPLOAD_SERVICE_DID = "did:web:staging.web3.storage" -CONTENT_CLAIMS_SERVICE_URL = "https://staging.claims.web3.storage" -UPLOAD_API_URL = "https://staging.up.web3.storage" -INDEXING_SERVICE_URL = "https://staging.indexer.storacha.network/" +FF_RAMP_UP_PROBABILITY = "0" +FF_DAGPB_CONTENT_CACHE_TTL_SECONDS = 300 +FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB = 2 +FF_DAGPB_CONTENT_CACHE_ENABLED = "true" +TELEMETRY_RATIO = 1.0 +### staging +# CONTENT_CLAIMS_SERVICE_URL = "https://staging.claims.web3.storage" +# CARPARK_PUBLIC_BUCKET_URL = "https://carpark-staging-0.r2.w3s.link" +# GATEWAY_SERVICE_DID = "did:web:staging.w3s.link" +#UPLOAD_SERVICE_DID = "did:web:staging.web3.storage" +#UPLOAD_API_URL = "https://staging.up.web3.storage" +#INDEXING_SERVICE_URL = "https://staging.indexer.storacha.network/" +### prod +CONTENT_CLAIMS_SERVICE_URL = "https://claims.web3.storage" +# CARPARK_PUBLIC_BUCKET_URL = "https://carpark-prod-0.r2.w3s.link" +GATEWAY_SERVICE_DID = "did:web:w3s.link" +UPLOAD_SERVICE_DID = "did:web:web3.storage" +UPLOAD_API_URL = "https://up.web3.storage" [[env.fforbeck.unsafe.bindings]] name = "RATE_LIMITER"