From 336d0b9a4b93dad11674a4af4acaca5541bcce40 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 21 Sep 2022 17:30:39 +0100 Subject: [PATCH] fix: OOM on large DAGs (#410) Storing a set of seen CIDs to short-cut DAG traversal while ensuring we have all the blocks in a DAG in the blockstore can cause OOMs for very large DAGs so replace the unbounded `Set` with a `LRU` cache with a configurable maximum size. --- packages/ipfs-repo-migrations/src/index.js | 16 +++++++---- packages/ipfs-repo/package.json | 1 + packages/ipfs-repo/src/pin-manager.js | 31 ++++++++++++++-------- packages/ipfs-repo/test/pins-test.js | 29 ++++++++++++++++++++ 4 files changed, 61 insertions(+), 16 deletions(-) diff --git a/packages/ipfs-repo-migrations/src/index.js b/packages/ipfs-repo-migrations/src/index.js index 0d7f12de..899232fb 100644 --- a/packages/ipfs-repo-migrations/src/index.js +++ b/packages/ipfs-repo-migrations/src/index.js @@ -1,4 +1,4 @@ -/* eslint complexity: ["error", 27] */ +/* eslint complexity: ["error", 28] */ import defaultMigrations from '../migrations/index.js' import * as repoVersion from './repo/version.js' @@ -43,8 +43,11 @@ export function getLatestMigrationVersion (migrations) { * @param {number} toVersion - Version to which the repo should be migrated. * @param {MigrationOptions} [options] - Options for migration */ -export async function migrate (path, backends, repoOptions, toVersion, { ignoreLock = false, onProgress, isDryRun = false, migrations }) { - migrations = migrations || defaultMigrations +export async function migrate (path, backends, repoOptions, toVersion, options = {}) { + const ignoreLock = options.ignoreLock ?? false + const onProgress = options.onProgress + const isDryRun = options.isDryRun ?? false + const migrations = options.migrations ?? defaultMigrations if (!path) { throw new errors.RequiredParameterError('Path argument is required!') @@ -143,8 +146,11 @@ export async function migrate (path, backends, repoOptions, toVersion, { ignoreL * @param {number} toVersion - Version to which the repo will be reverted. * @param {MigrationOptions} [options] - Options for the reversion */ -export async function revert (path, backends, repoOptions, toVersion, { ignoreLock = false, onProgress, isDryRun = false, migrations }) { - migrations = migrations || defaultMigrations +export async function revert (path, backends, repoOptions, toVersion, options = {}) { + const ignoreLock = options.ignoreLock ?? false + const onProgress = options.onProgress + const isDryRun = options.isDryRun ?? false + const migrations = options.migrations ?? defaultMigrations if (!path) { throw new errors.RequiredParameterError('Path argument is required!') diff --git a/packages/ipfs-repo/package.json b/packages/ipfs-repo/package.json index af7d8fa9..d5e05741 100644 --- a/packages/ipfs-repo/package.json +++ b/packages/ipfs-repo/package.json @@ -204,6 +204,7 @@ "multiformats": "^9.0.4", "p-queue": "^7.3.0", "proper-lockfile": "^4.0.0", + "quick-lru": "^6.1.1", "sort-keys": "^5.0.0", "uint8arrays": "^3.0.0" }, diff --git a/packages/ipfs-repo/src/pin-manager.js b/packages/ipfs-repo/src/pin-manager.js index 9decacc9..ff8f8dee 100644 --- a/packages/ipfs-repo/src/pin-manager.js +++ b/packages/ipfs-repo/src/pin-manager.js @@ -13,6 +13,16 @@ import { } from './utils/blockstore.js' import { walkDag } from './utils/walk-dag.js' import { PinTypes } from './pin-types.js' +import QuickLRU from 'quick-lru' + +/** + * @typedef {import('./types').PinType} PinType + * @typedef {import('./types').PinQueryType} PinQueryType + * @typedef {import('multiformats/codecs/interface').BlockCodec} BlockCodec + * @typedef {import('./types').PinOptions} PinOptions + * @typedef {import('./types').AbortOptions} AbortOptions + * @typedef {import('./types').Pins} Pins + */ /** * @typedef {object} PinInternal @@ -23,14 +33,13 @@ import { PinTypes } from './pin-types.js' */ /** - * @typedef {import('./types').PinType} PinType - * @typedef {import('./types').PinQueryType} PinQueryType - * @typedef {import('multiformats/codecs/interface').BlockCodec} BlockCodec - * @typedef {import('./types').PinOptions} PinOptions - * @typedef {import('./types').AbortOptions} AbortOptions - * @typedef {import('./types').Pins} Pins + * @typedef {object} FetchCompleteDagOptions + * @property {AbortSignal} [signal] + * @property {number} [cidCacheMaxSize] */ +const CID_CACHE_MAX_SIZE = 2048 + /** * @param {string} type */ @@ -95,7 +104,7 @@ export class PinManager { /** * @param {CID} cid - * @param {PinOptions & AbortOptions} [options] + * @param {PinOptions & FetchCompleteDagOptions & AbortOptions} [options] */ async pinRecursively (cid, options = {}) { await this.fetchCompleteDag(cid, options) @@ -271,10 +280,10 @@ export class PinManager { /** * @param {CID} cid - * @param {AbortOptions} options + * @param {FetchCompleteDagOptions} [options] */ - async fetchCompleteDag (cid, options) { - const seen = new Set() + async fetchCompleteDag (cid, options = {}) { + const seen = new QuickLRU({ maxSize: options.cidCacheMaxSize ?? CID_CACHE_MAX_SIZE }) /** * @param {CID} cid @@ -285,7 +294,7 @@ export class PinManager { return } - seen.add(cid.toString()) + seen.set(cid.toString(), true) const bytes = await this.blockstore.get(cid, options) const codec = await this.loadCodec(cid.code) diff --git a/packages/ipfs-repo/test/pins-test.js b/packages/ipfs-repo/test/pins-test.js index 67a52b46..6351ea81 100644 --- a/packages/ipfs-repo/test/pins-test.js +++ b/packages/ipfs-repo/test/pins-test.js @@ -9,6 +9,7 @@ import { CID } from 'multiformats/cid' import all from 'it-all' import { PinTypes } from '../src/pin-types.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import Sinon from 'sinon' /** * @param {import('@ipld/dag-pb').PBNode} node @@ -105,6 +106,34 @@ export default (repo) => { expect(pins.filter(p => p.cid.toString() === cid.toString())) .to.have.deep.nested.property('[0].metadata', metadata) }) + + it('does not traverse the same linked node twice', async () => { + // @ts-expect-error blockstore property is private + const getSpy = Sinon.spy(repo.pins.blockstore, 'get') + + const { cid: childCid, buf: childBuf } = await createDagPbNode() + await repo.blocks.put(childCid, childBuf) + + // create a root block with duplicate links to the same block + const { cid: rootCid, buf: rootBuf } = await createDagPbNode({ + Links: [{ + Name: 'child-1', + Tsize: childBuf.byteLength, + Hash: childCid + }, { + Name: 'child-2', + Tsize: childBuf.byteLength, + Hash: childCid + }] + }) + await repo.blocks.put(rootCid, rootBuf) + + await repo.pins.pinRecursively(rootCid) + + expect(getSpy.callCount).to.equal(2, 'should only have loaded the child block once') + expect(getSpy.getCall(0).args[0]).to.deep.equal(rootCid) + expect(getSpy.getCall(1).args[0]).to.deep.equal(childCid) + }) }) describe('.unpin', () => {