diff --git a/benchmarks/gc/package.json b/benchmarks/gc/package.json new file mode 100644 index 000000000..e3c016b82 --- /dev/null +++ b/benchmarks/gc/package.json @@ -0,0 +1,36 @@ +{ + "name": "benchmarks-gc", + "version": "1.0.0", + "main": "index.js", + "private": true, + "type": "module", + "scripts": { + "clean": "aegir clean", + "build": "aegir build --bundle false", + "lint": "aegir lint", + "dep-check": "aegir dep-check", + "start": "npm run build && node dist/src/index.js" + }, + "devDependencies": { + "@chainsafe/libp2p-noise": "^11.0.0", + "@chainsafe/libp2p-yamux": "^3.0.5", + "@ipld/dag-pb": "^4.0.2", + "@libp2p/websockets": "^5.0.3", + "aegir": "^38.1.5", + "blockstore-datastore-adapter": "^5.0.0", + "datastore-core": "^8.0.4", + "datastore-fs": "^8.0.0", + "datastore-level": "^9.0.4", + "execa": "^7.0.0", + "go-ipfs": "^0.18.1", + "helia": "~0.0.0", + "ipfs-core": "^0.18.0", + "ipfsd-ctl": "^13.0.0", + "it-all": "^2.0.0", + "it-drain": "^2.0.0", + "kubo-rpc-client": "^3.0.1", + "libp2p": "^0.42.2", + "multiformats": "^11.0.1", + "tinybench": "^2.3.1" + } +} diff --git a/benchmarks/gc/src/README.md b/benchmarks/gc/src/README.md new file mode 100644 index 000000000..f06ea72c8 --- /dev/null +++ b/benchmarks/gc/src/README.md @@ -0,0 +1,47 @@ +# GC Benchmark + +Benchmarks Helia GC performance against js-ipfs and Kubo + +- Removes any existing pins +- Creates 10000 DAGs with two nodes linked to by a root node that is pinned +- Creates 10000 unpinned blocks +- Runs GC to delete the unpinned blocks leaving the others intact + +All three implementations use on-disk block/datastores to ensure a reasonable basis for comparison. + +Warning! It can take a long time with realistic pinset sizes - on the order of a whole day. + +To run: + +1. Add `benchmarks/*` to the `workspaces` entry in the root `package.json` of this repo +3. Run + ```console + $ npm run reset + $ npm i + $ npm run build + $ cd benchmarks/gc + $ npm start + + > benchmarks-gc@1.0.0 start + > npm run build && node dist/src/index.js + + + > benchmarks-gc@1.0.0 build + > aegir build --bundle false + + [14:51:28] tsc [started] + [14:51:33] tsc [completed] + generating Ed25519 keypair... + ┌─────────┬────────────────┬─────────┬───────────┬──────┐ + │ (index) │ Implementation │ ops/s │ ms/op │ runs │ + ├─────────┼────────────────┼─────────┼───────────┼──────┤ + //... results here + ``` + +## Graph + +To output stats for a graph run: + +```console +$ npm run build && node dist/src/graph.js +``` diff --git a/benchmarks/gc/src/graph.ts b/benchmarks/gc/src/graph.ts new file mode 100644 index 000000000..521aae828 --- /dev/null +++ b/benchmarks/gc/src/graph.ts @@ -0,0 +1,18 @@ +import { execa } from 'execa' + +const ITERATIONS = 2 +const INCREMENT = 1000 +const MAX = 10000 + +for (let i = 1; i <= MAX / INCREMENT; i ++) { + await execa('node', ['dist/src/index.js'], { + env: { + ...process.env, + INCREMENT: (i * INCREMENT).toString(), + ITERATIONS: ITERATIONS.toString(), + ITERATION: i.toString() + }, + stdout: 'inherit', + stderr: 'inherit' + }) +} diff --git a/benchmarks/gc/src/helia.ts b/benchmarks/gc/src/helia.ts new file mode 100644 index 000000000..53e93fb9f --- /dev/null +++ b/benchmarks/gc/src/helia.ts @@ -0,0 +1,86 @@ +import { createHelia, DAGWalker } from 'helia' +import { createLibp2p } from 'libp2p' +import { webSockets } from '@libp2p/websockets' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import type { GcBenchmark } from './index.js' +import * as dagPb from '@ipld/dag-pb' +import all from 'it-all' +import os from 'node:os' +import path from 'node:path' +import { LevelDatastore } from 'datastore-level' +import { BlockstoreDatastoreAdapter } from 'blockstore-datastore-adapter' +import { ShardingDatastore } from 'datastore-core/sharding' +import { NextToLast } from 'datastore-core/shard' +import { FsDatastore } from 'datastore-fs' +import drain from 'it-drain' + +const dagPbWalker: DAGWalker = { + codec: dagPb.code, + async * walk (block) { + const node = dagPb.decode(block) + + yield * node.Links.map(l => l.Hash) + } +} + +export async function createHeliaBenchmark (): Promise { + const repoPath = path.join(os.tmpdir(), `helia-${Math.random()}`) + + const helia = await createHelia({ + blockstore: new BlockstoreDatastoreAdapter( + new ShardingDatastore( + new FsDatastore(`${repoPath}/blocks`, { + extension: '.data' + }), + new NextToLast(2) + ) + ), + datastore: new LevelDatastore(`${repoPath}/data`), + libp2p: await createLibp2p({ + transports: [ + webSockets() + ], + connectionEncryption: [ + noise() + ], + streamMuxers: [ + yamux() + ] + }), + dagWalkers: [ + dagPbWalker + ], + start: false + }) + + return { + async gc () { + await helia.gc() + }, + async putBlocks (blocks) { + await drain(helia.blockstore.putMany(blocks)) + }, + async pin (cid) { + await helia.pins.add(cid) + }, + async teardown () { + await helia.stop() + }, + async clearPins () { + const pins = await all(helia.pins.ls()) + + for (const pin of pins) { + await helia.pins.rm(pin.cid) + } + + return pins.length + }, + isPinned: (cid) => { + return helia.pins.isPinned(cid) + }, + hasBlock: (cid) => { + return helia.blockstore.has(cid) + } + } +} diff --git a/benchmarks/gc/src/index.ts b/benchmarks/gc/src/index.ts new file mode 100644 index 000000000..e61eaa64a --- /dev/null +++ b/benchmarks/gc/src/index.ts @@ -0,0 +1,193 @@ +import { Bench } from 'tinybench' +import { CID } from 'multiformats/cid' +import { createHeliaBenchmark } from './helia.js' +import { createIpfsBenchmark } from './ipfs.js' +import { createKuboBenchmark } from './kubo.js' +import * as dagPb from '@ipld/dag-pb' +import crypto from 'node:crypto' +import { sha256 } from 'multiformats/hashes/sha2' + +const PINNED_DAG_COUNT = parseInt(process.env.INCREMENT ?? '10000') +const GARBAGE_BLOCK_COUNT = parseInt(process.env.INCREMENT ?? '10000') +const ITERATIONS = parseInt(process.env.ITERATIONS ?? '5') +const RESULT_PRECISION = 2 + +export interface GcBenchmark { + gc: () => Promise + teardown: () => Promise + pin: (cid: CID ) => Promise + putBlocks: (blocks: Array<{ key: CID, value: Uint8Array }>) => Promise + clearPins: () => Promise + isPinned: (cid: CID) => Promise + hasBlock: (cid: CID) => Promise +} + +const blocks: Array<{ key: CID, value: Uint8Array }> = [] +const garbageBlocks: Array<{ key: CID, value: Uint8Array }> = [] +const pins: CID[] = [] + +/** + * Create blocks that will be pinned and/or deleted + */ +async function generateBlocks (): Promise { + // generate DAGs of two blocks linked by a root that will be pinned + for (let i = 0; i < PINNED_DAG_COUNT; i++) { + const block1 = dagPb.encode({ + Data: crypto.randomBytes(5), + Links: [] + }) + const mh1 = await sha256.digest(block1) + const cid1 = CID.createV1(dagPb.code, mh1) + + const block2 = dagPb.encode({ + Data: crypto.randomBytes(5), + Links: [] + }) + const mh2 = await sha256.digest(block2) + const cid2 = CID.createV1(dagPb.code, mh2) + + const block3 = dagPb.encode({ + Links: [{ + Hash: cid1, + Tsize: block1.length + }, { + Hash: cid2, + Tsize: block2.length + }] + }) + const mh3 = await sha256.digest(block3) + const cid3 = CID.createV1(dagPb.code, mh3) + + blocks.push({ key: cid1, value: block1 }) + blocks.push({ key: cid2, value: block2 }) + blocks.push({ key: cid3, value: block3 }) + pins.push(cid3) + } + + // generate garbage blocks that will be deleted + for (let i = 0; i < GARBAGE_BLOCK_COUNT; i++) { + const block = dagPb.encode({ + Data: crypto.randomBytes(5), + Links: [] + }) + const mh = await sha256.digest(block) + const cid = CID.createV1(dagPb.code, mh) + + garbageBlocks.push({ key: cid, value: block }) + } +} + +async function addBlocks (benchmark: GcBenchmark): Promise { + // add all the blocks + await benchmark.putBlocks(blocks) + await benchmark.putBlocks(garbageBlocks) +} + +async function pinBlocks (benchmark: GcBenchmark): Promise { + // add all of the pins + for (const pin of pins) { + await benchmark.pin(pin) + } +} + +const impls: Array<{ name: string, create: () => Promise, results: { gc: number[], clearedPins: number[], addedBlocks: number[], pinnedBlocks: number[] }}> = [{ + name: 'helia', + create: () => createHeliaBenchmark(), + results: { + gc: [], + clearedPins: [], + addedBlocks: [], + pinnedBlocks: [] + } +}, { + name: 'ipfs', + create: () => createIpfsBenchmark(), + results: { + gc: [], + clearedPins: [], + addedBlocks: [], + pinnedBlocks: [] + } +}, { + name: 'kubo', + create: () => createKuboBenchmark(), + results: { + gc: [], + clearedPins: [], + addedBlocks: [], + pinnedBlocks: [] + } +}] + +async function main (): Promise { + let subject: GcBenchmark + + await generateBlocks() + + const suite = new Bench({ + iterations: ITERATIONS, + time: 1 + }) + + for (const impl of impls) { + suite.add(impl.name, async () => { + const start = Date.now() + await subject.gc() + impl.results.gc.push(Date.now() - start) + }, { + beforeAll: async () => { + subject = await impl.create() + }, + beforeEach: async () => { + let start = Date.now() + const pinCount = await subject.clearPins() + + if (pinCount > 0) { + impl.results.clearedPins.push(Date.now() - start) + } + + start = Date.now() + await addBlocks(subject) + impl.results.addedBlocks.push(Date.now() - start) + + start = Date.now() + await pinBlocks(subject) + impl.results.pinnedBlocks.push(Date.now() - start) + }, + afterAll: async () => { + await subject.teardown() + } + }) + } + + await suite.run() + + if (process.env.INCREMENT != null) { + if (process.env.ITERATION === '1') { + console.info('implementation, count, clear pins (ms), add blocks (ms), add pins (ms), gc (ms)') + } + + for (const impl of impls) { + console.info( + `${impl.name},`, + `${process.env.INCREMENT},`, + `${(impl.results.clearedPins.reduce((acc, curr) => acc + curr, 0) / impl.results.clearedPins.length).toFixed(RESULT_PRECISION)},`, + `${(impl.results.addedBlocks.reduce((acc, curr) => acc + curr, 0) / impl.results.addedBlocks.length).toFixed(RESULT_PRECISION)},`, + `${(impl.results.pinnedBlocks.reduce((acc, curr) => acc + curr, 0) / impl.results.pinnedBlocks.length).toFixed(RESULT_PRECISION)},`, + `${(impl.results.gc.reduce((acc, curr) => acc + curr, 0) / impl.results.gc.length).toFixed(RESULT_PRECISION)}`, + ) + } + } else { + console.table(suite.tasks.map(({ name, result }) => ({ + 'Implementation': name, + 'ops/s': result?.hz.toFixed(RESULT_PRECISION), + 'ms/op': result?.period.toFixed(RESULT_PRECISION), + 'runs': result?.samples.length + }))) + } +} + +main().catch(err => { + console.error(err) // eslint-disable-line no-console + process.exit(1) +}) diff --git a/benchmarks/gc/src/ipfs.ts b/benchmarks/gc/src/ipfs.ts new file mode 100644 index 000000000..472e43ae6 --- /dev/null +++ b/benchmarks/gc/src/ipfs.ts @@ -0,0 +1,68 @@ +import { create } from 'ipfs-core' +import drain from 'it-drain' +import type { GcBenchmark } from './index.js' +import all from 'it-all' +import os from 'node:os' +import path from 'node:path' + +export async function createIpfsBenchmark (): Promise { + const repoPath = path.join(os.tmpdir(), `ipfs-${Math.random()}`) + + const ipfs = await create({ + config: { + Addresses: { + Swarm: [] + } + }, + repo: repoPath, + start: false, + init: { + emptyRepo: true + } + }) + + return { + async gc () { + await drain(ipfs.repo.gc()) + }, + async putBlocks (blocks) { + for (const { value } of blocks) { + await ipfs.block.put(value) + } + }, + async pin (cid) { + await ipfs.pin.add(cid) + }, + async teardown () { + await ipfs.stop() + }, + async clearPins () { + const pins = await all(ipfs.pin.ls()) + + for (const pin of pins) { + if (pin.type !== 'recursive' && pin.type !== 'direct') { + continue + } + + await ipfs.pin.rm(pin.cid) + } + + return pins.length + }, + isPinned: async (cid) => { + const result = await all(ipfs.pin.ls({ + paths: cid + })) + + return result[0].type.includes('direct') || result[0].type.includes('indirect') || result[0].type.includes('recursive') + }, + hasBlock: async (cid) => { + try { + await ipfs.block.get(cid) + return true + } catch { + return false + } + } + } +} diff --git a/benchmarks/gc/src/kubo.ts b/benchmarks/gc/src/kubo.ts new file mode 100644 index 000000000..4819f283b --- /dev/null +++ b/benchmarks/gc/src/kubo.ts @@ -0,0 +1,73 @@ +import drain from 'it-drain' +import type { GcBenchmark } from './index.js' +import all from 'it-all' +// @ts-expect-error no types +import * as goIpfs from 'go-ipfs' +import * as goRpcClient from 'kubo-rpc-client' + +import { createController } from 'ipfsd-ctl' + +export async function createKuboBenchmark (): Promise { + const controller = await createController({ + type: 'go', + test: true, + ipfsBin: goIpfs.path(), + ipfsHttpModule: goRpcClient, + ipfsOptions: { + init: { + emptyRepo: true + } + } + }) + + return { + async gc () { + await drain(controller.api.repo.gc()) + }, + async putBlocks (blocks) { + for (const { value } of blocks) { + await controller.api.block.put(value) + } + }, + async pin (cid) { + await controller.api.pin.add(cid) + }, + async teardown () { + await controller.stop() + }, + async clearPins () { + const pins = await all(controller.api.pin.ls()) + + for (const pin of pins) { + if (pin.type !== 'recursive' && pin.type !== 'direct') { + continue + } + + await controller.api.pin.rm(pin.cid) + } + + return pins.length + }, + isPinned: async (cid) => { + const result = await all(controller.api.pin.ls({ + paths: cid + })) + + const isPinned = result[0].type.includes('direct') || result[0].type.includes('indirect') || result[0].type.includes('recursive') + + if (!isPinned) { + console.info(result) + } + + return isPinned + }, + hasBlock: async (cid) => { + try { + await controller.api.block.get(cid) + return true + } catch { + return false + } + } + } +} diff --git a/benchmarks/gc/tsconfig.json b/benchmarks/gc/tsconfig.json new file mode 100644 index 000000000..ec97ffe82 --- /dev/null +++ b/benchmarks/gc/tsconfig.json @@ -0,0 +1,15 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src", + "test" + ], + "references": [ + { + "path": "../../packages/helia" + } + ] +} diff --git a/packages/helia/package.json b/packages/helia/package.json index 3f9fdc1c6..7ac7e641c 100644 --- a/packages/helia/package.json +++ b/packages/helia/package.json @@ -139,21 +139,32 @@ }, "dependencies": { "@helia/interface": "~0.0.0", + "@ipld/dag-pb": "^4.0.2", "@libp2p/interface-libp2p": "^1.1.0", "@libp2p/interfaces": "^3.3.1", "blockstore-core": "^3.0.0", + "cborg": "^1.10.0", "interface-blockstore": "^4.0.1", "interface-datastore": "^7.0.3", "interface-store": "^3.0.4", "ipfs-bitswap": "^16.0.0", + "it-all": "^2.0.0", + "it-drain": "^2.0.0", "it-filter": "^2.0.0", "it-merge": "^2.0.0", "it-pushable": "^3.1.2", - "multiformats": "^11.0.1" + "mortice": "^3.0.1", + "multiformats": "^11.0.1", + "p-defer": "^4.0.0", + "p-queue": "^7.3.4", + "progress-events": "^1.0.0", + "uint8arrays": "^4.0.3" }, "devDependencies": { "@chainsafe/libp2p-noise": "^11.0.0", "@chainsafe/libp2p-yamux": "^3.0.5", + "@ipld/dag-cbor": "^9.0.0", + "@ipld/dag-json": "^10.0.1", "@libp2p/websockets": "^5.0.3", "aegir": "^38.1.0", "datastore-core": "^8.0.4", diff --git a/packages/helia/src/helia.ts b/packages/helia/src/helia.ts index 621db2e80..9237961f4 100644 --- a/packages/helia/src/helia.ts +++ b/packages/helia/src/helia.ts @@ -1,6 +1,5 @@ -import type { Helia, InfoResponse } from '@helia/interface' +import type { GCOptions, Helia, InfoResponse } from '@helia/interface' import type { Libp2p } from '@libp2p/interface-libp2p' -import type { Blockstore } from 'interface-blockstore' import type { Datastore } from 'interface-datastore' import { identity } from 'multiformats/hashes/identity' import { sha256, sha512 } from 'multiformats/hashes/sha2' @@ -8,11 +7,17 @@ import type { MultihashHasher } from 'multiformats/hashes/interface' import type { HeliaInit } from '.' import { Bitswap, createBitswap } from 'ipfs-bitswap' import { BlockStorage } from './storage.js' +import type { Pins } from '@helia/interface/pins' +import { PinsImpl } from './pins.js' +import { assertDatastoreVersionIsCurrent } from './utils/datastore-version.js' +import drain from 'it-drain' +import { CustomProgressEvent } from 'progress-events' export class HeliaImpl implements Helia { public libp2p: Libp2p - public blockstore: Blockstore + public blockstore: BlockStorage public datastore: Datastore + public pins: Pins #bitswap: Bitswap @@ -24,6 +29,8 @@ export class HeliaImpl implements Helia { ...(init.hashers ?? []) ] + this.pins = new PinsImpl(init.datastore, init.blockstore, init.dagWalkers ?? []) + this.#bitswap = createBitswap(init.libp2p, init.blockstore, { hashLoader: { getHasher: async (codecOrName: string | number) => { @@ -41,11 +48,13 @@ export class HeliaImpl implements Helia { }) this.libp2p = init.libp2p - this.blockstore = new BlockStorage(init.blockstore, this.#bitswap) + this.blockstore = new BlockStorage(init.blockstore, this.#bitswap, this.pins) this.datastore = init.datastore } async start (): Promise { + await assertDatastoreVersionIsCurrent(this.datastore) + this.#bitswap.start() await this.libp2p.start() } @@ -65,4 +74,27 @@ export class HeliaImpl implements Helia { status: this.libp2p.isStarted() ? 'running' : 'stopped' } } + + async gc (options: GCOptions = {}): Promise { + const releaseLock = await this.blockstore.lock.writeLock() + + try { + const helia = this + const blockstore = this.blockstore.unwrap() + + await drain(blockstore.deleteMany((async function * () { + for await (const cid of blockstore.queryKeys({})) { + if (await helia.pins.isPinned(cid, options)) { + continue + } + + yield cid + + options.onProgress?.(new CustomProgressEvent('helia:gc:deleted', cid)) + } + }()))) + } finally { + releaseLock() + } + } } diff --git a/packages/helia/src/index.ts b/packages/helia/src/index.ts index 419b036fb..60e83a807 100644 --- a/packages/helia/src/index.ts +++ b/packages/helia/src/index.ts @@ -29,9 +29,18 @@ import type { Helia } from '@helia/interface' import type { Libp2p } from '@libp2p/interface-libp2p' import type { Blockstore } from 'interface-blockstore' import type { Datastore } from 'interface-datastore' +import type { CID } from 'multiformats/cid' import type { MultihashHasher } from 'multiformats/hashes/interface' import { HeliaImpl } from './helia.js' +/** + * DAGWalkers take a block and yield CIDs encoded in that block + */ +export interface DAGWalker { + codec: number + walk: (block: Uint8Array) => AsyncGenerator +} + /** * Options used to create a Helia node. */ @@ -58,6 +67,13 @@ export interface HeliaInit { */ hashers?: MultihashHasher[] + /** + * In order to pin CIDs that correspond to a DAG, it's necessary to know + * how to traverse that DAG. DAGWalkers take a block and yield any CIDs + * encoded within that block. + */ + dagWalkers?: DAGWalker[] + /** * Pass `false` to not start the helia node */ diff --git a/packages/helia/src/pins.ts b/packages/helia/src/pins.ts new file mode 100644 index 000000000..ca53f65a9 --- /dev/null +++ b/packages/helia/src/pins.ts @@ -0,0 +1,238 @@ +import type { AddOptions, IsPinnedOptions, LsOptions, Pin, Pins, RmOptions } from '@helia/interface/pins' +import { Datastore, Key } from 'interface-datastore' +import { CID, Version } from 'multiformats/cid' +import * as cborg from 'cborg' +import { base36 } from 'multiformats/bases/base36' +import type { Blockstore } from 'interface-blockstore' +import PQueue from 'p-queue' +import type { AbortOptions } from '@libp2p/interfaces' +import { equals as uint8ArrayEquals } from 'uint8arrays/equals' +import defer from 'p-defer' +import type { DAGWalker } from './index.js' +import { cborWalker, dagPbWalker, jsonWalker, rawWalker } from './utils/dag-walkers.js' + +const DEFAULT_DAG_WALKERS = [ + rawWalker, + dagPbWalker, + cborWalker, + jsonWalker +] + +interface DatastorePin { + /** + * 0 for a direct pin or an arbitrary (+ve, whole) number or Infinity + */ + depth: number + + /** + * User-specific metadata for the pin + */ + metadata: Record +} + +interface DatastorePinnedBlock { + pinCount: number + pinnedBy: Uint8Array[] +} + +const DATASTORE_PIN_PREFIX = '/pin/' +const DATASTORE_BLOCK_PREFIX = '/pinned-block/' +const DATASTORE_ENCODING = base36 +// const DAG_WALK_MAX_QUEUE_LENGTH = 10 +const DAG_WALK_QUEUE_CONCURRENCY = 1 + +interface WalkDagOptions extends AbortOptions { + depth: number +} + +function toDSKey (cid: CID): Key { + if (cid.version === 0) { + cid = cid.toV1() + } + + return new Key(`${DATASTORE_PIN_PREFIX}${cid.toString(DATASTORE_ENCODING)}`) +} + +export class PinsImpl implements Pins { + private readonly datastore: Datastore + private readonly blockstore: Blockstore + private dagWalkers: Record + + constructor (datastore: Datastore, blockstore: Blockstore, dagWalkers: DAGWalker[]) { + this.datastore = datastore + this.blockstore = blockstore + this.dagWalkers = {} + + ;[...DEFAULT_DAG_WALKERS, ...dagWalkers].forEach(dagWalker => { + this.dagWalkers[dagWalker.codec] = dagWalker + }) + } + + async add (cid: CID, options: AddOptions = {}): Promise { + const pinKey = toDSKey(cid) + + if (await this.datastore.has(pinKey)) { + throw new Error('Already pinned') + } + + const depth = Math.round(options.depth ?? Infinity) + + if (depth < 0) { + throw new Error('Depth must be greater than or equal to 0') + } + + // use a queue to walk the DAG instead of recursion so we can traverse very large DAGs + const queue = new PQueue({ + concurrency: DAG_WALK_QUEUE_CONCURRENCY + }) + void queue.add(async () => { + await this.#walkDag(cid, queue, (pinnedBlock) => { + // do not update pinned block if this block is already pinned by this CID + if (pinnedBlock.pinnedBy.find(c => uint8ArrayEquals(c, cid.bytes)) != null) { + return + } + + pinnedBlock.pinCount++ + pinnedBlock.pinnedBy.push(cid.bytes) + }, { + ...options, + depth + }) + }) + + // if a job in the queue errors, throw that error + const deferred = defer() + + queue.on('error', (err) => { + queue.clear() + deferred.reject(err) + }) + + // wait for the queue to complete or error + await Promise.race([ + queue.onIdle(), + deferred.promise + ]) + + const pin: DatastorePin = { + depth, + metadata: options.metadata ?? {} + } + + await this.datastore.put(pinKey, cborg.encode(pin), options) + + return { + cid, + ...pin + } + } + + /** + * Walk the DAG behind the passed CID, ensure all blocks are present in the blockstore + * and update the pin count for them + */ + async #walkDag (cid: CID, queue: PQueue, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: WalkDagOptions): Promise { + if (options.depth === -1) { + return + } + + const dagWalker = this.dagWalkers[cid.code] + + if (dagWalker == null) { + throw new Error(`No dag walker found for cid codec ${cid.code}`) + } + + const block = await this.blockstore.get(cid) + + await this.#updatePinnedBlock(cid, withPinnedBlock, options) + + // walk dag, ensure all blocks are present + for await (const cid of dagWalker.walk(block)) { + void queue.add(async () => { + await this.#walkDag(cid, queue, withPinnedBlock, { + ...options, + depth: options.depth - 1 + }) + }) + } + } + + /** + * Update the pin count for the CID + */ + async #updatePinnedBlock (cid: CID, withPinnedBlock: (pinnedBlock: DatastorePinnedBlock) => void, options: AbortOptions): Promise { + const blockKey = new Key(`${DATASTORE_BLOCK_PREFIX}${DATASTORE_ENCODING.encode(cid.multihash.bytes)}`) + + let pinnedBlock: DatastorePinnedBlock = { + pinCount: 0, + pinnedBy: [] + } + + try { + pinnedBlock = cborg.decode(await this.datastore.get(blockKey, options)) + } catch (err: any) { + if (err.code !== 'ERR_NOT_FOUND') { + throw err + } + } + + withPinnedBlock(pinnedBlock) + + if (pinnedBlock.pinCount === 0) { + if (await this.datastore.has(blockKey)) { + await this.datastore.delete(blockKey) + return + } + } + + await this.datastore.put(blockKey, cborg.encode(pinnedBlock), options) + } + + async rm (cid: CID, options: RmOptions = {}): Promise { + const pinKey = toDSKey(cid) + const buf = await this.datastore.get(pinKey, options) + const pin = cborg.decode(buf) + + await this.datastore.delete(pinKey, options) + + // use a queue to walk the DAG instead of recursion so we can traverse very large DAGs + const queue = new PQueue({ + concurrency: DAG_WALK_QUEUE_CONCURRENCY + }) + void queue.add(async () => { + await this.#walkDag(cid, queue, (pinnedBlock) => { + pinnedBlock.pinCount-- + pinnedBlock.pinnedBy = pinnedBlock.pinnedBy.filter(c => uint8ArrayEquals(c, cid.bytes)) + }, { + ...options, + depth: pin.depth + }) + }) + await queue.onIdle() + + return { + cid, + ...pin + } + } + + async * ls (options: LsOptions = {}): AsyncGenerator { + for await (const { key, value } of this.datastore.query({ + prefix: DATASTORE_PIN_PREFIX + (options.cid != null ? `${options.cid.toString(base36)}` : '') + }, options)) { + const cid = CID.parse(key.toString().substring(5), base36) + const pin = cborg.decode(value) + + yield { + cid, + ...pin + } + } + } + + async isPinned (cid: CID, options: IsPinnedOptions = {}): Promise { + const blockKey = new Key(`${DATASTORE_BLOCK_PREFIX}${DATASTORE_ENCODING.encode(cid.multihash.bytes)}`) + + return await this.datastore.has(blockKey, options) + } +} diff --git a/packages/helia/src/storage.ts b/packages/helia/src/storage.ts index 59f8b7c37..80c4dce56 100644 --- a/packages/helia/src/storage.ts +++ b/packages/helia/src/storage.ts @@ -7,6 +7,9 @@ import type { Bitswap } from 'ipfs-bitswap' import type { CID } from 'multiformats/cid' import type { AbortOptions } from '@libp2p/interfaces' import type { AwaitIterable } from 'interface-store' +import type { Mortice } from 'mortice' +import createMortice from 'mortice' +import type { Pins } from '@helia/interface/pins' export interface BlockStorageOptions extends AbortOptions { progress?: (evt: Event) => void @@ -18,17 +21,21 @@ export interface BlockStorageOptions extends AbortOptions { * not present Bitswap will be used to fetch them from network peers. */ export class BlockStorage extends BaseBlockstore implements Blockstore { + public lock: Mortice private readonly child: Blockstore private readonly bitswap: Bitswap + private readonly pins: Pins /** * Create a new BlockStorage */ - constructor (blockstore: Blockstore, bitswap: Bitswap) { + constructor (blockstore: Blockstore, bitswap: Bitswap, pins: Pins) { super() this.child = blockstore this.bitswap = bitswap + this.pins = pins + this.lock = createMortice() } async open (): Promise { @@ -47,14 +54,16 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { * Put a block to the underlying datastore */ async put (cid: CID, block: Uint8Array, options: AbortOptions = {}): Promise { - if (await this.has(cid)) { - return - } + const releaseLock = await this.lock.writeLock() - if (this.bitswap.isStarted()) { - await this.bitswap.put(cid, block, options) - } else { - await this.child.put(cid, block, options) + try { + if (this.bitswap.isStarted()) { + await this.bitswap.put(cid, block, options) + } else { + await this.child.put(cid, block, options) + } + } finally { + releaseLock() } } @@ -62,12 +71,18 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { * Put a multiple blocks to the underlying datastore */ async * putMany (blocks: AwaitIterable<{ key: CID, value: Uint8Array }>, options: AbortOptions = {}): AsyncGenerator<{ key: CID, value: Uint8Array }, void, undefined> { - const missingBlocks = filter(blocks, async ({ key }) => { return !(await this.has(key)) }) + const releaseLock = await this.lock.writeLock() + + try { + const missingBlocks = filter(blocks, async ({ key }) => { return !(await this.child.has(key)) }) - if (this.bitswap.isStarted()) { - yield * this.bitswap.putMany(missingBlocks, options) - } else { - yield * this.child.putMany(missingBlocks, options) + if (this.bitswap.isStarted()) { + yield * this.bitswap.putMany(missingBlocks, options) + } else { + yield * this.child.putMany(missingBlocks, options) + } + } finally { + releaseLock() } } @@ -75,10 +90,16 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { * Get a block by cid */ async get (cid: CID, options: BlockStorageOptions = {}): Promise { - if (!(await this.has(cid)) && this.bitswap.isStarted()) { - return await this.bitswap.get(cid, options) - } else { - return await this.child.get(cid, options) + const releaseLock = await this.lock.readLock() + + try { + if (!(await this.has(cid)) && this.bitswap.isStarted()) { + return await this.bitswap.get(cid, options) + } else { + return await this.child.get(cid, options) + } + } finally { + releaseLock() } } @@ -86,53 +107,103 @@ export class BlockStorage extends BaseBlockstore implements Blockstore { * Get multiple blocks back from an array of cids */ async * getMany (cids: AwaitIterable, options: BlockStorageOptions = {}): AsyncGenerator { - const getFromBitswap = pushable({ objectMode: true }) - const getFromChild = pushable({ objectMode: true }) - - void Promise.resolve().then(async () => { - for await (const cid of cids) { - if (!(await this.has(cid)) && this.bitswap.isStarted()) { - getFromBitswap.push(cid) - } else { - getFromChild.push(cid) + const releaseLock = await this.lock.readLock() + + try { + const getFromBitswap = pushable({ objectMode: true }) + const getFromChild = pushable({ objectMode: true }) + + void Promise.resolve().then(async () => { + for await (const cid of cids) { + if (!(await this.has(cid)) && this.bitswap.isStarted()) { + getFromBitswap.push(cid) + } else { + getFromChild.push(cid) + } } - } - - getFromBitswap.end() - getFromChild.end() - }).catch(err => { - getFromBitswap.throw(err) - }) - yield * merge( - this.bitswap.getMany(getFromBitswap, options), - this.child.getMany(getFromChild, options) - ) + getFromBitswap.end() + getFromChild.end() + }).catch(err => { + getFromBitswap.throw(err) + }) + + yield * merge( + this.bitswap.getMany(getFromBitswap, options), + this.child.getMany(getFromChild, options) + ) + } finally { + releaseLock() + } } /** * Delete a block from the blockstore */ async delete (cid: CID, options: AbortOptions = {}): Promise { - await this.child.delete(cid, options) + const releaseLock = await this.lock.writeLock() + + try { + if (await this.pins.isPinned(cid)) { + throw new Error('CID was pinned') + } + + await this.child.delete(cid, options) + } finally { + releaseLock() + } } /** * Delete multiple blocks from the blockstore */ async * deleteMany (cids: AwaitIterable, options: AbortOptions = {}): AsyncGenerator { - yield * this.child.deleteMany(cids, options) + const releaseLock = await this.lock.writeLock() + + try { + const storage = this + + yield * this.child.deleteMany((async function * () { + for await (const cid of cids) { + if (await storage.pins.isPinned(cid)) { + throw new Error('CID was pinned') + } + + yield cid + } + }()), options) + } finally { + releaseLock() + } } async has (cid: CID, options: AbortOptions = {}): Promise { - return await this.child.has(cid, options) + const releaseLock = await this.lock.readLock() + + try { + return await this.child.has(cid, options) + } finally { + releaseLock() + } } async * query (q: Query, options: AbortOptions = {}): AsyncGenerator<{ key: CID, value: Uint8Array }, void, undefined> { - yield * this.child.query(q, options) + const releaseLock = await this.lock.readLock() + + try { + yield * this.child.query(q, options) + } finally { + releaseLock() + } } async * queryKeys (q: KeyQuery, options: AbortOptions = {}): AsyncGenerator { - yield * this.child.queryKeys(q, options) + const releaseLock = await this.lock.readLock() + + try { + yield * this.child.queryKeys(q, options) + } finally { + releaseLock() + } } } diff --git a/packages/helia/src/utils/dag-walkers.ts b/packages/helia/src/utils/dag-walkers.ts new file mode 100644 index 000000000..989f29ad4 --- /dev/null +++ b/packages/helia/src/utils/dag-walkers.ts @@ -0,0 +1,169 @@ +/* eslint max-depth: ["error", 7] */ + +import * as dagPb from '@ipld/dag-pb' +import * as cborg from 'cborg' +import { Type, Token } from 'cborg' +import * as cborgJson from 'cborg/json' +import type { DAGWalker } from '../index.js' +import * as raw from 'multiformats/codecs/raw' +import { CID } from 'multiformats' +import { base64 } from 'multiformats/bases/base64' + +/** + * Dag walker for dag-pb CIDs + */ +export const dagPbWalker: DAGWalker = { + codec: dagPb.code, + async * walk (block) { + const node = dagPb.decode(block) + + yield * node.Links.map(l => l.Hash) + } +} + +/** + * Dag walker for raw CIDs + */ +export const rawWalker: DAGWalker = { + codec: raw.code, + async * walk () { + // no embedded CIDs in a raw block + } +} + +// https://github.com/ipfs/go-ipfs/issues/3570#issuecomment-273931692 +const CID_TAG = 42 + +/** + * Dag walker for dag-cbor CIDs. Does not actually use dag-cbor since + * all we are interested in is extracting the the CIDs from the block + * so we can just use cborg for that. + */ +export const cborWalker: DAGWalker = { + codec: 0x71, + async * walk (block) { + const cids: CID[] = [] + const tags: cborg.TagDecoder[] = [] + tags[CID_TAG] = (bytes) => { + if (bytes[0] !== 0) { + throw new Error('Invalid CID for CBOR tag 42; expected leading 0x00') + } + + const cid = CID.decode(bytes.subarray(1)) // ignore leading 0x00 + + cids.push(cid) + + return cid + } + + cborg.decode(block, { + tags + }) + + yield * cids + } +} + +/** + * Borrowed from @ipld/dag-json + */ +class DagJsonTokenizer extends cborgJson.Tokenizer { + private readonly tokenBuffer: cborg.Token[] + + constructor (data: Uint8Array, options?: cborg.DecodeOptions) { + super(data, options) + + this.tokenBuffer = [] + } + + done (): boolean { + return this.tokenBuffer.length === 0 && super.done() + } + + _next (): cborg.Token { + if (this.tokenBuffer.length > 0) { + // @ts-expect-error https://github.com/Microsoft/TypeScript/issues/30406 + return this.tokenBuffer.pop() + } + return super.next() + } + + /** + * Implements rules outlined in https://github.com/ipld/specs/pull/356 + */ + next (): cborg.Token { + const token = this._next() + + if (token.type === Type.map) { + const keyToken = this._next() + if (keyToken.type === Type.string && keyToken.value === '/') { + const valueToken = this._next() + if (valueToken.type === Type.string) { // *must* be a CID + const breakToken = this._next() // swallow the end-of-map token + if (breakToken.type !== Type.break) { + throw new Error('Invalid encoded CID form') + } + this.tokenBuffer.push(valueToken) // CID.parse will pick this up after our tag token + return new Token(Type.tag, 42, 0) + } + if (valueToken.type === Type.map) { + const innerKeyToken = this._next() + if (innerKeyToken.type === Type.string && innerKeyToken.value === 'bytes') { + const innerValueToken = this._next() + if (innerValueToken.type === Type.string) { // *must* be Bytes + for (let i = 0; i < 2; i++) { + const breakToken = this._next() // swallow two end-of-map tokens + if (breakToken.type !== Type.break) { + throw new Error('Invalid encoded Bytes form') + } + } + const bytes = base64.decode(`m${innerValueToken.value}`) + return new Token(Type.bytes, bytes, innerValueToken.value.length) + } + this.tokenBuffer.push(innerValueToken) // bail + } + this.tokenBuffer.push(innerKeyToken) // bail + } + this.tokenBuffer.push(valueToken) // bail + } + this.tokenBuffer.push(keyToken) // bail + } + return token + } +} + +/** + * Dag walker for dag-json CIDs. Does not actually use dag-json since + * all we are interested in is extracting the the CIDs from the block + * so we can just use cborg/json for that. + */ +export const jsonWalker: DAGWalker = { + codec: 0x0129, + async * walk (block) { + const cids: CID[] = [] + const tags: cborg.TagDecoder[] = [] + tags[CID_TAG] = (string) => { + const cid = CID.parse(string) + + cids.push(cid) + + return cid + } + + cborgJson.decode(block, { + tags, + tokenizer: new DagJsonTokenizer(block, { + tags, + allowIndefinite: true, + allowUndefined: true, + allowNaN: true, + allowInfinity: true, + allowBigInt: true, + strict: false, + rejectDuplicateMapKeys: false + }) + }) + + yield * cids + } +} diff --git a/packages/helia/src/utils/datastore-version.ts b/packages/helia/src/utils/datastore-version.ts new file mode 100644 index 000000000..3b16b7d9c --- /dev/null +++ b/packages/helia/src/utils/datastore-version.ts @@ -0,0 +1,23 @@ +import { Datastore, Key } from 'interface-datastore' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' + +const DS_VERSION_KEY = new Key('/version') +const CURRENT_VERSION = 1 + +export async function assertDatastoreVersionIsCurrent (datastore: Datastore): Promise { + if (!(await datastore.has(DS_VERSION_KEY))) { + await datastore.put(DS_VERSION_KEY, uint8ArrayFromString(`${CURRENT_VERSION}`)) + + return + } + + const buf = await datastore.get(DS_VERSION_KEY) + const str = uint8ArrayToString(buf) + const version = parseInt(str, 10) + + if (version !== CURRENT_VERSION) { + // TODO: write migrations when we break compatibility - for an example, see https://github.com/ipfs/js-ipfs-repo/tree/master/packages/ipfs-repo-migrations + throw new Error('Unknown datastore version, a datastore migration may be required') + } +} diff --git a/packages/helia/test/fixtures/create-block.ts b/packages/helia/test/fixtures/create-block.ts new file mode 100644 index 000000000..1d13b6295 --- /dev/null +++ b/packages/helia/test/fixtures/create-block.ts @@ -0,0 +1,12 @@ +import type { Blockstore } from 'blockstore-core/dist/src/base' +import { CID } from 'multiformats/cid' +import { sha256 } from 'multiformats/hashes/sha2' + +export async function createBlock (codec: Codec, block: Uint8Array, blockstore: Blockstore): Promise> { + const mh = await sha256.digest(block) + const cid = CID.createV1(codec, mh) + + await blockstore.put(cid, block) + + return cid +} diff --git a/packages/helia/test/fixtures/create-dag.ts b/packages/helia/test/fixtures/create-dag.ts new file mode 100644 index 000000000..ae41d98bb --- /dev/null +++ b/packages/helia/test/fixtures/create-dag.ts @@ -0,0 +1,90 @@ +import type { Blockstore } from 'interface-blockstore' +import type { CID } from 'multiformats/cid' +import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' +import { createBlock } from './create-block.js' + +export interface DAGNode { + cid: CID + level: number + links: CID[] +} + +/** + * Creates a DAG for use with the dag walker + * + * E.g. + * + * ``` + * createDag(..., 2, 3) + * + * // creates: + * { + * 'level-0': { + * level: 0, + * cid: CID(baedreibc6d3tr7glrwvflqkvsd7gszrb77kdmhgqukcwx5wnxdnydm3eia), + * links: [ + * CID(baedreicvd6e5yvdg22elvqofvwjtnueennogk24zotw6sbrybrthme7bye), + * CID(baedreicv6ufi5msnpsrqz4ziw2ekxgvopedluzhgpp74ukw6ncjtaypwlu), + * CID(baedreifc4d4pzuzl6rmlf2vinsjcyfnuixogcqcabv65jqtgkbreu6czdm) + * ] + * }, + * 'level-0-0': { + * level: 1, + * cid: CID(baedreicvd6e5yvdg22elvqofvwjtnueennogk24zotw6sbrybrthme7bye), + * links: [ + * CID(baedreifcryimpj7cajld5znmpjis7z5gktwnsjbipbi6zkoe2y634sj5n4), + * CID(baedreid375udk75eukd3xms5nqa7fq45uq2m5xddpf6reamgp3356dqvba), + * CID(baedreig5ajzof2zotjqjaqloaqren2qc2ytuyalhsbf6e7qtpiwqzjyvsu) + * ] + * }, + * 'level-0-1': { + * level: 1, + * cid: CID(baedreicv6ufi5msnpsrqz4ziw2ekxgvopedluzhgpp74ukw6ncjtaypwlu), + * links: [ + * CID(baedreidqugkmpuyh3klk3ediwwltqfuapesi2dr65k2bkyokhrp4m5cuvq), + * CID(baedreia75iubjvtv4ukrq2qxs6mwo5iyrmn4273hmkuvlbrsw3wwss2zwe), + * CID(baedreibjyashtb6wj4inxvcfmsqbsfm5s375524fqo4liw3cshcmu2x7ma) + * ] + * }, + * 'level-0-2': { + * level: 1, + * cid: CID(baedreifc4d4pzuzl6rmlf2vinsjcyfnuixogcqcabv65jqtgkbreu6czdm), + * links: [ + * CID(baedreibfll7pocwwxysan5yprzywx277zzimmhwqdgrvs7bx3oujqd4xpa), + * CID(baedreig5rrihh454f4qcef476m42aughuz5hf4rcc72damlz6okxdafpxe), + * CID(baedreigytohidtspknvirjkslrox23g3qmye5mrx2wijn6ykrpthyg2kry) + * ] + * } + * } + * ``` + */ +export async function createDag (codec: number, blockstore: Blockstore, depth: number, children: number): Promise> { + const dag: Record = {} + const root = await createBlock(codec, uint8arrayFromString('level-0'), blockstore) + + await addChildren(root, 'level', 0, 0, depth, children, dag, codec, blockstore) + + return dag +} + +async function addChildren (cid: CID, name: string, level: number, index: number, depth: number, children: number, dag: Record, codec: number, blockstore: Blockstore): Promise { + if (depth === 0) { + return + } + + name = `${name}-${index}` + + dag[name] = { + level, + cid, + links: [] + } + + for (let i = 0; i < children; i++) { + const subChild = await createBlock(codec, uint8arrayFromString(`${name}-${i}`), blockstore) + + dag[name].links.push(subChild) + + await addChildren(subChild, name, level + 1, index + i, depth - 1, children, dag, codec, blockstore) + } +} diff --git a/packages/helia/test/fixtures/dag-walker.ts b/packages/helia/test/fixtures/dag-walker.ts new file mode 100644 index 000000000..b726d01cc --- /dev/null +++ b/packages/helia/test/fixtures/dag-walker.ts @@ -0,0 +1,14 @@ +import type { DAGWalker } from '../../src/index.js' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import type { DAGNode } from './create-dag.js' + +export function dagWalker (codec: number, dag: Record): DAGWalker { + return { + codec, + async * walk (block) { + const node = dag[uint8ArrayToString(block)] ?? { links: [] } + + yield * node.links + } + } +} diff --git a/packages/helia/test/gc.spec.ts b/packages/helia/test/gc.spec.ts new file mode 100644 index 000000000..6b5f6bfd7 --- /dev/null +++ b/packages/helia/test/gc.spec.ts @@ -0,0 +1,171 @@ +/* eslint-env mocha */ +import { expect } from 'aegir/chai' +import { MemoryBlockstore } from 'blockstore-core' +import { MemoryDatastore } from 'datastore-core' +import { createLibp2p } from 'libp2p' +import { webSockets } from '@libp2p/websockets' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { createHelia } from '../src/index.js' +import type { Helia } from '@helia/interface' +import * as raw from 'multiformats/codecs/raw' +import { createBlock } from './fixtures/create-block.js' +import * as dagPb from '@ipld/dag-pb' +import * as dagCbor from '@ipld/dag-cbor' +import * as dagJson from '@ipld/dag-json' + +describe('gc', () => { + let helia: Helia + + beforeEach(async () => { + helia = await createHelia({ + datastore: new MemoryDatastore(), + blockstore: new MemoryBlockstore(), + libp2p: await createLibp2p({ + transports: [ + webSockets() + ], + connectionEncryption: [ + noise() + ], + streamMuxers: [ + yamux() + ] + }) + }) + }) + + afterEach(async () => { + if (helia != null) { + await helia.stop() + } + }) + + it('pins a dag-pb node and does not garbage collect it or it\'s children', async () => { + const child1 = await createBlock(dagPb.code, dagPb.encode({ + Data: Uint8Array.from([0, 1, 2, 3]), + Links: [] + }), helia.blockstore) + const child2 = await createBlock(dagPb.code, dagPb.encode({ + Data: Uint8Array.from([4, 5, 6, 7]), + Links: [] + }), helia.blockstore) + + const node = await createBlock(dagPb.code, dagPb.encode({ + Links: [{ + Hash: child1, + Name: 'child1' + }, { + Hash: child2, + Name: 'child2' + }] + }), helia.blockstore) + + await helia.pins.add(node) + + // this block will be garbage collected + const doomed = await createBlock(dagPb.code, dagPb.encode({ + Data: Uint8Array.from([8, 9, 0, 1]), + Links: [] + }), helia.blockstore) + + await expect(helia.blockstore.has(child1)).to.eventually.be.true() + await expect(helia.blockstore.has(child2)).to.eventually.be.true() + await expect(helia.blockstore.has(node)).to.eventually.be.true() + await expect(helia.blockstore.has(doomed)).to.eventually.be.true() + + await helia.gc() + + await expect(helia.blockstore.has(child1)).to.eventually.be.true() + await expect(helia.blockstore.has(child2)).to.eventually.be.true() + await expect(helia.blockstore.has(node)).to.eventually.be.true() + await expect(helia.blockstore.has(doomed)).to.eventually.be.false() + }) + + it('pins a dag-cbor node and does not garbage collect it or it\'s children', async () => { + const child1 = await createBlock(dagCbor.code, dagCbor.encode({ + foo: 'bar' + }), helia.blockstore) + const child2 = await createBlock(dagCbor.code, dagCbor.encode({ + baz: 'qux' + }), helia.blockstore) + + const node = await createBlock(dagCbor.code, dagCbor.encode({ + children: [ + child1, + child2 + ] + }), helia.blockstore) + + await helia.pins.add(node) + + // this block will be garbage collected + const doomed = await createBlock(dagCbor.code, dagJson.encode({ + quux: 'garply' + }), helia.blockstore) + + await expect(helia.blockstore.has(child1)).to.eventually.be.true() + await expect(helia.blockstore.has(child2)).to.eventually.be.true() + await expect(helia.blockstore.has(node)).to.eventually.be.true() + await expect(helia.blockstore.has(doomed)).to.eventually.be.true() + + await helia.gc() + + await expect(helia.blockstore.has(child1)).to.eventually.be.true() + await expect(helia.blockstore.has(child2)).to.eventually.be.true() + await expect(helia.blockstore.has(node)).to.eventually.be.true() + await expect(helia.blockstore.has(doomed)).to.eventually.be.false() + }) + + it('pins a dag-json node and does not garbage collect it or it\'s children', async () => { + const child1 = await createBlock(dagJson.code, dagJson.encode({ + foo: 'bar' + }), helia.blockstore) + const child2 = await createBlock(dagJson.code, dagJson.encode({ + baz: 'qux' + }), helia.blockstore) + + const node = await createBlock(dagJson.code, dagJson.encode({ + children: [ + child1, + child2 + ] + }), helia.blockstore) + + await helia.pins.add(node) + + // this block will be garbage collected + const doomed = await createBlock(dagJson.code, dagJson.encode({ + quux: 'garply' + }), helia.blockstore) + + await expect(helia.blockstore.has(child1)).to.eventually.be.true() + await expect(helia.blockstore.has(child2)).to.eventually.be.true() + await expect(helia.blockstore.has(node)).to.eventually.be.true() + await expect(helia.blockstore.has(doomed)).to.eventually.be.true() + + await helia.gc() + + await expect(helia.blockstore.has(child1)).to.eventually.be.true() + await expect(helia.blockstore.has(child2)).to.eventually.be.true() + await expect(helia.blockstore.has(node)).to.eventually.be.true() + await expect(helia.blockstore.has(doomed)).to.eventually.be.false() + }) + + it('pins a raw node and does not garbage collect it', async () => { + const cid = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + + await helia.pins.add(cid) + + // this block will be garbage collected + const doomed = await createBlock(raw.code, Uint8Array.from([4, 5, 6, 7]), helia.blockstore) + + await expect(helia.blockstore.has(cid)).to.eventually.be.true() + await expect(helia.blockstore.has(doomed)).to.eventually.be.true() + + await helia.gc() + + await expect(helia.blockstore.has(cid)).to.eventually.be.true() + await expect(helia.blockstore.has(doomed)).to.eventually.be.false() + }) +}) diff --git a/packages/helia/test/pins.depth-limited.spec.ts b/packages/helia/test/pins.depth-limited.spec.ts new file mode 100644 index 000000000..b37e13282 --- /dev/null +++ b/packages/helia/test/pins.depth-limited.spec.ts @@ -0,0 +1,117 @@ +/* eslint-env mocha */ +import { expect } from 'aegir/chai' +import { MemoryBlockstore } from 'blockstore-core' +import { MemoryDatastore } from 'datastore-core' +import { createLibp2p } from 'libp2p' +import { webSockets } from '@libp2p/websockets' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { createHelia } from '../src/index.js' +import type { Helia } from '@helia/interface' +import { dagWalker } from './fixtures/dag-walker.js' +import { createDag, DAGNode } from './fixtures/create-dag.js' + +const MAX_DEPTH = 3 + +describe('pins (depth limited)', () => { + let helia: Helia + let dag: Record + + beforeEach(async () => { + const blockstore = new MemoryBlockstore() + + // arbitrary CID codec value + const codec = 7 + + // create a DAG, MAX_DEPTH levels deep with each level having three children + dag = await createDag(codec, blockstore, MAX_DEPTH, 3) + + helia = await createHelia({ + datastore: new MemoryDatastore(), + blockstore, + libp2p: await createLibp2p({ + transports: [ + webSockets() + ], + connectionEncryption: [ + noise() + ], + streamMuxers: [ + yamux() + ] + }), + dagWalkers: [ + dagWalker(codec, dag) + ] + }) + }) + + afterEach(async () => { + if (helia != null) { + await helia.stop() + } + }) + + for (let i = 0; i < MAX_DEPTH; i++) { + describe(`depth ${i}`, () => { // eslint-disable-line no-loop-func + it(`pins a block to depth ${i}`, async () => { + await helia.pins.add(dag['level-0'].cid, { + depth: i + }) + + // only root block should be pinned + for (const [name, node] of Object.entries(dag)) { + if (node.level <= i) { + await expect(helia.pins.isPinned(node.cid)).to.eventually.be.true(`did not pin ${name}`) + } else { + await expect(helia.pins.isPinned(node.cid)).to.eventually.be.false(`pinned ${name}`) + } + + if (node.level > i) { + // no children of this node should be pinned + for (const cid of node.links) { + await expect(helia.pins.isPinned(cid)).to.eventually.be.false(`pinned ${name}`) + } + } + } + }) + + it(`unpins to depth ${i}`, async () => { + await helia.pins.add(dag['level-0'].cid, { + depth: i + }) + await helia.pins.rm(dag['level-0'].cid) + + // no blocks should be pinned + for (const [name, node] of Object.entries(dag)) { + for (const cid of node.links) { + await expect(helia.pins.isPinned(cid)).to.eventually.be.false(`did not unpin ${name}`) + } + } + }) + + it(`does not delete a pinned sub-block under level ${i}`, async () => { + await helia.pins.add(dag['level-0'].cid, { + depth: i + }) + + // no sub blocks should be pinned + for (const [name, node] of Object.entries(dag)) { + if (node.level <= i) { + await expect(helia.blockstore.delete(node.cid)).to.eventually.be.rejected + .with.property('message', 'CID was pinned', `allowed deleting pinned block ${name}`) + } else { + await expect(helia.blockstore.delete(node.cid)).to.eventually.be.undefined(`allowed deleting pinned block ${name}`) + } + + if (node.level > i) { + // no children of this node should be pinned + for (const cid of node.links) { + await expect(helia.blockstore.delete(cid)).to.eventually.be.undefined(`allowed deleting pinned block ${name}`) + } + } + } + }) + }) + } +}) diff --git a/packages/helia/test/pins.recursive.spec.ts b/packages/helia/test/pins.recursive.spec.ts new file mode 100644 index 000000000..4d330bf7d --- /dev/null +++ b/packages/helia/test/pins.recursive.spec.ts @@ -0,0 +1,87 @@ +/* eslint-env mocha */ +import { expect } from 'aegir/chai' +import { MemoryBlockstore } from 'blockstore-core' +import { MemoryDatastore } from 'datastore-core' +import { createLibp2p } from 'libp2p' +import { webSockets } from '@libp2p/websockets' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { createHelia } from '../src/index.js' +import type { Helia } from '@helia/interface' +import { dagWalker } from './fixtures/dag-walker.js' +import { createDag, DAGNode } from './fixtures/create-dag.js' + +describe('pins (recursive)', () => { + let helia: Helia + let dag: Record + + beforeEach(async () => { + const blockstore = new MemoryBlockstore() + + // arbitrary CID codec value + const codec = 7 + + // create a DAG, two levels deep with each level having three children + dag = await createDag(codec, blockstore, 2, 3) + + helia = await createHelia({ + datastore: new MemoryDatastore(), + blockstore, + libp2p: await createLibp2p({ + transports: [ + webSockets() + ], + connectionEncryption: [ + noise() + ], + streamMuxers: [ + yamux() + ] + }), + dagWalkers: [ + dagWalker(codec, dag) + ] + }) + }) + + afterEach(async () => { + if (helia != null) { + await helia.stop() + } + }) + + it('pins a block recursively', async () => { + await helia.pins.add(dag['level-0'].cid) + + // all sub blocks should be pinned + for (const [name, node] of Object.entries(dag)) { + for (const cid of node.links) { + await expect(helia.pins.isPinned(cid)).to.eventually.be.true(`did not pin ${name}`) + } + } + }) + + it('unpins recursively', async () => { + await helia.pins.add(dag['level-0'].cid) + await helia.pins.rm(dag['level-0'].cid) + + // no sub blocks should be pinned + for (const [name, node] of Object.entries(dag)) { + for (const cid of node.links) { + await expect(helia.pins.isPinned(cid)).to.eventually.be.false(`did not unpin ${name}`) + } + } + }) + + it('does not delete a pinned sub-block', async () => { + await helia.pins.add(dag['level-0'].cid) + + // no sub blocks should be pinned + for (const [name, node] of Object.entries(dag)) { + for (const cid of node.links) { + await expect(helia.blockstore.delete(cid)).to.eventually.be.rejected + .with.property('message', 'CID was pinned', `allowed deleting pinned block ${name}`) + } + } + }) +}) diff --git a/packages/helia/test/pins.spec.ts b/packages/helia/test/pins.spec.ts new file mode 100644 index 000000000..b7a8afdf6 --- /dev/null +++ b/packages/helia/test/pins.spec.ts @@ -0,0 +1,139 @@ +/* eslint-env mocha */ +import { expect } from 'aegir/chai' +import { MemoryBlockstore } from 'blockstore-core' +import { MemoryDatastore } from 'datastore-core' +import { createLibp2p } from 'libp2p' +import { webSockets } from '@libp2p/websockets' +import { noise } from '@chainsafe/libp2p-noise' +import { yamux } from '@chainsafe/libp2p-yamux' +import { createHelia } from '../src/index.js' +import type { Helia } from '@helia/interface' +import { CID } from 'multiformats/cid' +import * as raw from 'multiformats/codecs/raw' +import { createBlock } from './fixtures/create-block.js' +import all from 'it-all' + +describe('pins', () => { + let helia: Helia + + beforeEach(async () => { + helia = await createHelia({ + datastore: new MemoryDatastore(), + blockstore: new MemoryBlockstore(), + libp2p: await createLibp2p({ + transports: [ + webSockets() + ], + connectionEncryption: [ + noise() + ], + streamMuxers: [ + yamux() + ] + }) + }) + }) + + afterEach(async () => { + if (helia != null) { + await helia.stop() + } + }) + + it('pins a block', async () => { + const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cidV0 = CID.createV0(cidV1.multihash) + + await helia.pins.add(cidV1) + + await expect(helia.pins.isPinned(cidV1)).to.eventually.be.true('did not pin v1 CID') + await expect(helia.pins.isPinned(cidV0)).to.eventually.be.true('did not pin v0 CID') + }) + + it('unpins a block', async () => { + const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cidV0 = CID.createV0(cidV1.multihash) + + await helia.pins.add(cidV1) + + await expect(helia.pins.isPinned(cidV1)).to.eventually.be.true('did not pin v1 CID') + await expect(helia.pins.isPinned(cidV0)).to.eventually.be.true('did not pin v0 CID') + + await helia.pins.rm(cidV1) + + await expect(helia.pins.isPinned(cidV1)).to.eventually.be.false('did not unpin v1 CID') + await expect(helia.pins.isPinned(cidV0)).to.eventually.be.false('did not unpin v0 CID') + }) + + it('does not delete a pinned block', async () => { + const cid = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + + await helia.pins.add(cid) + + await expect(helia.blockstore.delete(cid)).to.eventually.be.rejected + .with.property('message', 'CID was pinned') + }) + + it('lists pins created with default args', async () => { + const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + + await helia.pins.add(cidV1) + + const pins = await all(helia.pins.ls()) + + expect(pins).to.have.lengthOf(1) + expect(pins).to.have.nested.property('[0].cid').that.eql(cidV1) + expect(pins).to.have.nested.property('[0].depth', Infinity) + expect(pins).to.have.nested.property('[0].metadata').that.eql({}) + }) + + it('lists pins with depth', async () => { + const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + + await helia.pins.add(cidV1, { + depth: 5 + }) + + const pins = await all(helia.pins.ls()) + + expect(pins).to.have.lengthOf(1) + expect(pins).to.have.nested.property('[0].cid').that.eql(cidV1) + expect(pins).to.have.nested.property('[0].depth', 5) + }) + + it('lists pins with metadata', async () => { + const cidV1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const metadata = { + foo: 'bar', + baz: 5, + qux: false + } + + await helia.pins.add(cidV1, { + metadata + }) + + const pins = await all(helia.pins.ls()) + + expect(pins).to.have.lengthOf(1) + expect(pins).to.have.nested.property('[0].cid').that.eql(cidV1) + expect(pins).to.have.nested.property('[0].metadata').that.eql(metadata) + }) + + it('lists pins directly', async () => { + const cid1 = await createBlock(raw.code, Uint8Array.from([0, 1, 2, 3]), helia.blockstore) + const cid2 = await createBlock(raw.code, Uint8Array.from([4, 5, 6, 7]), helia.blockstore) + + await helia.pins.add(cid1) + await helia.pins.add(cid2) + + const pins = await all(helia.pins.ls({ + cid: cid1 + })) + + expect(pins).to.have.lengthOf(1) + expect(pins).to.have.nested.property('[0].cid').that.eql(cid1) + expect(pins).to.have.nested.property('[0].depth', Infinity) + expect(pins).to.have.nested.property('[0].metadata').that.eql({}) + }) +}) diff --git a/packages/interface/package.json b/packages/interface/package.json index 961402959..833160fcf 100644 --- a/packages/interface/package.json +++ b/packages/interface/package.json @@ -47,9 +47,9 @@ "types": "./dist/src/index.d.ts", "import": "./dist/src/index.js" }, - "./errors": { - "types": "./dist/src/errors.d.ts", - "import": "./dist/src/errors.js" + "./pins": { + "types": "./dist/src/pins.d.ts", + "import": "./dist/src/pins.js" } }, "eslintConfig": { @@ -156,7 +156,9 @@ "@libp2p/interfaces": "^3.3.1", "@multiformats/multiaddr": "^11.1.5", "interface-blockstore": "^4.0.1", - "interface-datastore": "^7.0.3" + "interface-datastore": "^7.0.3", + "multiformats": "^11.0.1", + "progress-events": "^1.0.0" }, "devDependencies": { "aegir": "^38.1.0" diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index 82239f2ae..92dd9e235 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -20,6 +20,9 @@ import type { AbortOptions } from '@libp2p/interfaces' import type { PeerId } from '@libp2p/interface-peer-id' import type { Multiaddr } from '@multiformats/multiaddr' import type { Datastore } from 'interface-datastore' +import type { Pins } from './pins.js' +import type { ProgressEvent, ProgressOptions } from 'progress-events' +import type { CID } from 'multiformats/cid' /** * The API presented by a Helia node. @@ -40,6 +43,11 @@ export interface Helia { */ datastore: Datastore + /** + * Pinning operations for blocks in the blockstore + */ + pins: Pins + /** * Returns information about this node * @@ -65,6 +73,18 @@ export interface Helia { * Stops the Helia node */ stop: () => Promise + + /** + * Remove any unpinned blocks from the blockstore + */ + gc: (options?: GCOptions) => Promise +} + +export type GcEvents = + ProgressEvent<'helia:gc:deleted', CID> + +export interface GCOptions extends AbortOptions, ProgressOptions { + } export interface InfoOptions extends AbortOptions { diff --git a/packages/interface/src/pins.ts b/packages/interface/src/pins.ts new file mode 100644 index 000000000..f0125cbeb --- /dev/null +++ b/packages/interface/src/pins.ts @@ -0,0 +1,62 @@ +import type { AbortOptions } from '@libp2p/interfaces' +import type { CID } from 'multiformats/cid' +import type { ProgressEvent, ProgressOptions } from 'progress-events' + +export type PinType = 'recursive' | 'direct' | 'indirect' + +export interface Pin { + cid: CID + depth: number + metadata: Record +} + +export type AddPinEvents = + ProgressEvent<'helia:pin:add', unknown> + +export interface AddOptions extends AbortOptions, ProgressOptions { + /** + * How deeply to pin the DAG, defaults to Infinity + */ + depth?: number + + /** + * Optional user-defined metadata to store with the pin + */ + metadata?: Record +} + +export interface RmOptions extends AbortOptions { + +} + +export interface LsOptions extends AbortOptions { + cid?: CID +} + +export interface IsPinnedOptions extends AbortOptions { + +} + +export interface Pins { + /** + * Pin a block in the blockstore. It will not be deleted + * when garbage collection is run. + */ + add: (cid: CID, options?: AddOptions) => Promise + + /** + * Unpin the block that corresponds to the passed CID. The block will + * be deleted when garbage collection is run. + */ + rm: (cid: CID, options?: RmOptions) => Promise + + /** + * List all blocks that have been pinned. + */ + ls: (options?: LsOptions) => AsyncGenerator + + /** + * Return true if the passed CID is pinned + */ + isPinned: (cid: CID, options?: IsPinnedOptions) => Promise +}