Skip to content

Commit

Permalink
feat(dag-protobuf): cache dag pb directory structure and block indexes (
Browse files Browse the repository at this point in the history
#147)

### Context
The requests to fetch a DAG Protobuf directory structure using a CID
execute the following steps:
1. Get all content claims lookups to identify where we are pulling data
from
2. Fetch cid `bafy...cid` - which represents the folder containing the
target file, so that we can determine the verifiable cid for the file
(let's call that `bafy...file`)
3. Fetch cid `bafy...file` to get the root block of the file, which in
UnixFS contains NO raw data, but rather is a list of sub-blocks that
contain the file (let's call those `bafy...bytes1` and `bafy...bytes2`)
4. Fetch the first raw data blocks to send the first byte

This PR enables the caching strategy for steps 2 to 4 where instead of
fetching the directory structure from the locator and navigating the DAG
for every request, it caches the DAGs if they have a Protobuf structure
and content size <= 2MB.

### Changes
- Updated `withContentClaimsDagula` middleware to cache DAG PB content
requests
- New KV Store
  - `DAGPB_CONTENT_CACHE`
- Caching rules
- `FF_DAGPB_CONTENT_CACHE_TTL_SECONDS`: The number that represents when
to expire the key-value pair in seconds from now. The minimum value is
60 seconds. Any value less than 60MB will not be used. We will use **30
days TTL** by default for Production environment.
- `FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB`: The maximum size of the
key-value pair in MB. The minimum value is 1 MB. Any value less than 1MB
will not be used. We will use **2MB max file size** by default.
- `FF_DAGPB_CONTENT_CACHE_ENABLED`: The flag that enables the DAGPB
content cache. The cache is **disabled in prod** by default.

### Samples
**2MB file - no cache**

![2mb-file-no-cache](https://github.com/user-attachments/assets/9dfe2a5c-ef19-467a-a7e2-3b5f83610a94)
- 5.2 seconds

**2MB file - cached**

![2mb-file-cached](https://github.com/user-attachments/assets/08ca8782-7873-4b55-85d0-a90b78ea52eb)
- 1.4 seconds

### KV Limits
- Reads: unlimited
- Writes (different keys): unlimited
- Writes (same key): 1w / sec (rate limiting)
- Storage/account & Storage/namespace: unlimited
- key size: <= 512 bytes
- value size: <= 25MiB
- Minimum cache ttl: 60 seconds
- Higher limit? -> https://forms.gle/ukpeZVLWLnKeixDu7

resolves storacha/project-tracking#301
  • Loading branch information
fforbeck authored Jan 28, 2025
1 parent f61e4c6 commit e367852
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 23 deletions.
20 changes: 15 additions & 5 deletions src/bindings.d.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
import { CID } from '@web3-storage/gateway-lib/handlers'
import { Environment as RateLimiterEnvironment } from './middleware/withRateLimit.types.ts'
import { Environment as CarParkFetchEnvironment } from './middleware/withCarParkFetch.types.ts'
import { Environment as CarBlockEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withCarBlockHandler.types.ts'
import { Environment as ContentClaimsDagulaEnvironment } from './middleware/withContentClaimsDagula.types.ts'
import { Environment as EgressTrackerEnvironment } from './middleware/withEgressTracker.types.ts'
import { Environment as EgressClientEnvironment } from './middleware/withEgressClient.types.ts'
import { Environment as GatewayIdentityEnvironment } from './middleware/withGatewayIdentity.types.ts'
import { Environment as DelegationsStorageEnvironment } from './middleware/withDelegationsStorage.types.ts'
import { Environment as LocatorEnvironment } from './middleware/withLocator.types.ts'
import { UnknownLink } from 'multiformats'
import { DIDKey } from '@ucanto/principal/ed25519'

export interface Environment
extends CarBlockEnvironment,
RateLimiterEnvironment,
ContentClaimsDagulaEnvironment,
EgressTrackerEnvironment {
extends RateLimiterEnvironment,
CarBlockEnvironment,
CarParkFetchEnvironment,
ContentClaimsDagulaEnvironment,
EgressClientEnvironment,
EgressTrackerEnvironment,
GatewayIdentityEnvironment,
DelegationsStorageEnvironment,
LocatorEnvironment {
VERSION: string
CONTENT_CLAIMS_SERVICE_URL?: string
HONEYCOMB_API_KEY: string
Expand Down
89 changes: 89 additions & 0 deletions src/middleware/withContentClaimsDagula.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { Dagula } from 'dagula'
import { base58btc } from 'multiformats/bases/base58'
import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching'
import * as dagPb from '@ipld/dag-pb'

/**
* @import {
Expand All @@ -16,6 +18,7 @@ import * as BatchingFetcher from '@web3-storage/blob-fetcher/fetcher/batching'

/**
* Creates a dagula instance backed by content claims.
* Get operations for DAG Protobuf content are cached if the DAGPB_CONTENT_CACHE is enabled.
*
* @type {(
* Middleware<
Expand All @@ -31,6 +34,10 @@ export function withContentClaimsDagula (handler) {
const fetcher = BatchingFetcher.create(locator, ctx.fetch)
const dagula = new Dagula({
async get (cid) {
const dagPbContent = await getDagPbContent(env, fetcher, cid, ctx)
if (dagPbContent) {
return dagPbContent
}
const res = await fetcher.fetch(cid.multihash)
return res.ok ? { cid, bytes: await res.ok.bytes() } : undefined
},
Expand All @@ -46,3 +53,85 @@ export function withContentClaimsDagula (handler) {
return handler(request, env, { ...ctx, blocks: dagula, dag: dagula, unixfs: dagula })
}
}

/**
* Returns the cached DAG Protobuf bytes if they exist, otherwise fetches the DAG Protobuf bytes
* from the fetcher and caches them in the KV store.
*
* @param {Environment} env
* @param {import('@web3-storage/blob-fetcher').Fetcher} fetcher
* @param {import('multiformats').UnknownLink} cid
* @param {import('@web3-storage/gateway-lib').Context} ctx
* @returns {Promise<{ cid: import('multiformats').UnknownLink, bytes: Uint8Array } | undefined>}
*/
async function getDagPbContent (env, fetcher, cid, ctx) {
if (env.FF_DAGPB_CONTENT_CACHE_ENABLED === 'true' && cid.code === dagPb.code) {
const cachedBytes = await getCachedDagPbBytes(env, cid)
if (cachedBytes) {
return { cid, bytes: cachedBytes }
}

const res = await fetcher.fetch(cid.multihash)
if (res.ok) {
const bytes = await res.ok.bytes()
const dagPbNode = dagPb.decode(bytes)
if (dagPbNode.Links && dagPbNode.Links.length === 0) {
// Old DAG PB nodes have no links ("raw" blocks as leaves), so we don't want to cache them
return { cid, bytes }
}
ctx.waitUntil(cacheDagPbBytes(env, cid, bytes))
return { cid, bytes }
}
}
return undefined
}

/**
* Caches the DAG Protobuf content into the KV store if the content size is less than or equal to the max size.
* The content is cached for the duration of the TTL (seconds), if the TTL is not set, the content is cached indefinitely.
*
* @param {Environment} env
* @param {import('multiformats').UnknownLink} cid
* @param {Uint8Array} bytes
* @returns {Promise<void>}
*/
async function cacheDagPbBytes (env, cid, bytes) {
const maxSize = env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB ? parseInt(env.FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB) * 1024 * 1024 : undefined
if (maxSize && bytes.length <= maxSize) {
try {
const ttlSeconds = env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS ? parseInt(env.FF_DAGPB_CONTENT_CACHE_TTL_SECONDS) : 0
const key = getDagPbKey(cid)
await env.DAGPB_CONTENT_CACHE.put(key, bytes, {
expirationTtl: ttlSeconds > 60 ? ttlSeconds : undefined
})
} catch (/** @type {any} */ error) {
console.error(error)
}
}
}

/**
* Returns the cached DAG Protobuf bytes if they exist, otherwise returns null.
*
* @param {Environment} env
* @param {import('multiformats').UnknownLink} cid
* @returns {Promise<Uint8Array | null>}
*/
async function getCachedDagPbBytes (env, cid) {
const key = getDagPbKey(cid)
const dagPbBytes = await env.DAGPB_CONTENT_CACHE.get(key, 'arrayBuffer')
if (dagPbBytes) {
return new Uint8Array(dagPbBytes)
}
return null
}

/**
* Returns the base58btc encoded key for the DAG Protobuf content in the KV store.
*
* @param {import('multiformats').UnknownLink} cid
* @returns {string}
*/
function getDagPbKey (cid) {
return base58btc.encode(cid.multihash.bytes)
}
19 changes: 19 additions & 0 deletions src/middleware/withContentClaimsDagula.types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
import { KVNamespace } from '@cloudflare/workers-types'
import { Environment as MiddlewareEnvironment } from '@web3-storage/gateway-lib'

export interface Environment extends MiddlewareEnvironment {
CONTENT_CLAIMS_SERVICE_URL?: string
/**
* The KV namespace that stores the DAGPB content cache.
*/
DAGPB_CONTENT_CACHE: KVNamespace
/**
* The number that represents when to expire the key-value pair in seconds from now.
* The minimum value is 60 seconds. Any value less than 60MB will not be used.
*/
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS?: string
/**
* The maximum size of the key-value pair in MB.
* The minimum value is 1 MB. Any value less than 1MB will not be used.
*/
FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB?: string
/**
* The flag that enables the DAGPB content cache.
*/
FF_DAGPB_CONTENT_CACHE_ENABLED: string
}
108 changes: 103 additions & 5 deletions test/miniflare/freeway.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, before, beforeEach, after, it } from 'node:test'
import assert from 'node:assert'
import { randomBytes } from 'node:crypto'
import { Miniflare } from 'miniflare'
import { Log, LogLevel, Miniflare } from 'miniflare'
import * as Link from 'multiformats/link'
import { sha256 } from 'multiformats/hashes/sha2'
import * as raw from 'multiformats/codecs/raw'
Expand Down Expand Up @@ -49,17 +49,26 @@ describe('freeway', () => {
const { port } = server.address()
url = new URL(`http://127.0.0.1:${port}`)
miniflare = new Miniflare({
host: '127.0.0.1',
port: 8787,
inspectorPort: 9898,
log: new Log(LogLevel.INFO),
cache: false, // Disable Worker Global Cache to test cache middlewares
bindings: {
CONTENT_CLAIMS_SERVICE_URL: claimsService.url.toString(),
CARPARK_PUBLIC_BUCKET_URL: url.toString(),
GATEWAY_SERVICE_DID: 'did:example:gateway'
GATEWAY_SERVICE_DID: 'did:example:gateway',
DAGPB_CONTENT_CACHE: 'DAGPB_CONTENT_CACHE',
FF_DAGPB_CONTENT_CACHE_ENABLED: 'true',
FF_DAGPB_CONTENT_CACHE_TTL_SECONDS: 300,
FF_DAGPB_CONTENT_CACHE_MAX_SIZE_MB: 2
},
inspectorPort: 9898,
scriptPath: 'dist/worker.mjs',
modules: true,
compatibilityFlags: ['nodejs_compat'],
compatibilityDate: '2024-09-23',
r2Buckets: ['CARPARK']
r2Buckets: ['CARPARK'],
kvNamespaces: ['DAGPB_CONTENT_CACHE']
})

bucket = await miniflare.getR2Bucket('CARPARK')
Expand All @@ -70,10 +79,15 @@ describe('freeway', () => {
builder = new Builder(bucket)
})

beforeEach(() => {
beforeEach(async () => {
claimsService.resetCallCount()
claimsService.resetClaims()
bucketService.resetCallCount()
const dagpbCache = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE')
const keys = await dagpbCache.list()
for (const key of keys.keys) {
await dagpbCache.delete(key.name)
}
})

after(() => {
Expand Down Expand Up @@ -481,4 +495,88 @@ describe('freeway', () => {
}
}))
})

it('should be faster to get a file in a directory when the protobuf directory structure is cached', async () => {
// Generate 3 files wrapped in a folder, >2MB each to force a unixfs file header block (dag protobuf)
const input = [
new File([randomBytes(2_050_550)], 'data.txt'),
new File([randomBytes(2_050_550)], 'image.png'),
new File([randomBytes(2_050_550)], 'image2.png')
]
// Adding to the builder will generate the unixfs file header block
const { root, shards } = await builder.add(input)
assert.equal(root.code, 112, 'Root should be a protobuf directory code 112')

// Generate claims for the shards
for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
claimsService.addClaims(claims)
}

// Check that the cache is empty
const dagpb = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE')
const cachedContent1 = await dagpb.list()
assert.equal(cachedContent1.keys.length, 0, 'Cache should be empty')

// First request adds the file to the cache, so it takes longer
const start = performance.now()
const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}/${input[2].name}`, {
headers: {
'Cache-Control': 'no-cache'
}
})
if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`)
const end = performance.now()
assertBlobEqual(input[2], await res.blob())

const cachedContent2 = await dagpb.list()
assert(cachedContent2.keys.length > 0, 'Cache should have one or more keys')

// Second request retrieves the file from the cache, so it should take less time than the first request
const start2 = performance.now()
console.log('SECOND REQUEST')
const res2 = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}/${input[2].name}`, {
headers: {
'Cache-Control': 'no-cache'
}
})
if (!res2.ok) assert.fail(`unexpected response: ${await res2.text()}`)
const end2 = performance.now()
assertBlobEqual(input[2], await res2.blob())
assert(end2 - start2 < end - start, 'Second request should take less time than the first request')
})

it('should not cache content if it is not dag protobuf content', async () => {
// Generate 1 file, >1MB each and do not wrap it in a folder
const input = new File([randomBytes(256)], 'data.txt')
const { root, shards } = await builder.add(input)
assert.equal(root.code, 85, 'Root should be a raw file code 85')

// Generate claims for the shards
for (const shard of shards) {
const location = new URL(toBlobKey(shard.multihash), url)
const res = await fetch(location)
assert(res.body)
const claims = await generateBlockLocationClaims(claimsService.signer, shard, res.body, location)
claimsService.addClaims(claims)
}

// Check that the cache is empty
const dagpb = await miniflare.getKVNamespace('DAGPB_CONTENT_CACHE')
const cachedContent = await dagpb.list()
assert.equal(cachedContent.keys.length, 0, 'Cache should be empty')

// It should not add the file to the cache, because it is not dag protobuf content
const res = await miniflare.dispatchFetch(`http://localhost:8787/ipfs/${root}`, {
headers: {
'Cache-Control': 'no-cache'
}
})
if (!res.ok) assert.fail(`unexpected response: ${await res.text()}`)
assertBlobEqual(input, await res.blob())
assert.equal(cachedContent.keys.length, 0, 'Cache should be empty')
})
})
Loading

0 comments on commit e367852

Please sign in to comment.