Skip to content

Commit

Permalink
fix: OOM on large DAGs (#410)
Browse files Browse the repository at this point in the history
Storing a set of seen CIDs to short-cut DAG traversal while ensuring we have all the blocks in a DAG in the blockstore can cause OOMs for very large DAGs so replace the unbounded `Set` with a `LRU` cache with a configurable maximum size.
  • Loading branch information
achingbrain authored Sep 21, 2022
1 parent c35a5db commit 336d0b9
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 16 deletions.
16 changes: 11 additions & 5 deletions packages/ipfs-repo-migrations/src/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* eslint complexity: ["error", 27] */
/* eslint complexity: ["error", 28] */

import defaultMigrations from '../migrations/index.js'
import * as repoVersion from './repo/version.js'
Expand Down Expand Up @@ -43,8 +43,11 @@ export function getLatestMigrationVersion (migrations) {
* @param {number} toVersion - Version to which the repo should be migrated.
* @param {MigrationOptions} [options] - Options for migration
*/
export async function migrate (path, backends, repoOptions, toVersion, { ignoreLock = false, onProgress, isDryRun = false, migrations }) {
migrations = migrations || defaultMigrations
export async function migrate (path, backends, repoOptions, toVersion, options = {}) {
const ignoreLock = options.ignoreLock ?? false
const onProgress = options.onProgress
const isDryRun = options.isDryRun ?? false
const migrations = options.migrations ?? defaultMigrations

if (!path) {
throw new errors.RequiredParameterError('Path argument is required!')
Expand Down Expand Up @@ -143,8 +146,11 @@ export async function migrate (path, backends, repoOptions, toVersion, { ignoreL
* @param {number} toVersion - Version to which the repo will be reverted.
* @param {MigrationOptions} [options] - Options for the reversion
*/
export async function revert (path, backends, repoOptions, toVersion, { ignoreLock = false, onProgress, isDryRun = false, migrations }) {
migrations = migrations || defaultMigrations
export async function revert (path, backends, repoOptions, toVersion, options = {}) {
const ignoreLock = options.ignoreLock ?? false
const onProgress = options.onProgress
const isDryRun = options.isDryRun ?? false
const migrations = options.migrations ?? defaultMigrations

if (!path) {
throw new errors.RequiredParameterError('Path argument is required!')
Expand Down
1 change: 1 addition & 0 deletions packages/ipfs-repo/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@
"multiformats": "^9.0.4",
"p-queue": "^7.3.0",
"proper-lockfile": "^4.0.0",
"quick-lru": "^6.1.1",
"sort-keys": "^5.0.0",
"uint8arrays": "^3.0.0"
},
Expand Down
31 changes: 20 additions & 11 deletions packages/ipfs-repo/src/pin-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,16 @@ import {
} from './utils/blockstore.js'
import { walkDag } from './utils/walk-dag.js'
import { PinTypes } from './pin-types.js'
import QuickLRU from 'quick-lru'

/**
* @typedef {import('./types').PinType} PinType
* @typedef {import('./types').PinQueryType} PinQueryType
* @typedef {import('multiformats/codecs/interface').BlockCodec<any, any>} BlockCodec
* @typedef {import('./types').PinOptions} PinOptions
* @typedef {import('./types').AbortOptions} AbortOptions
* @typedef {import('./types').Pins} Pins
*/

/**
* @typedef {object} PinInternal
Expand All @@ -23,14 +33,13 @@ import { PinTypes } from './pin-types.js'
*/

/**
* @typedef {import('./types').PinType} PinType
* @typedef {import('./types').PinQueryType} PinQueryType
* @typedef {import('multiformats/codecs/interface').BlockCodec<any, any>} BlockCodec
* @typedef {import('./types').PinOptions} PinOptions
* @typedef {import('./types').AbortOptions} AbortOptions
* @typedef {import('./types').Pins} Pins
* @typedef {object} FetchCompleteDagOptions
* @property {AbortSignal} [signal]
* @property {number} [cidCacheMaxSize]
*/

const CID_CACHE_MAX_SIZE = 2048

/**
* @param {string} type
*/
Expand Down Expand Up @@ -95,7 +104,7 @@ export class PinManager {

/**
* @param {CID} cid
* @param {PinOptions & AbortOptions} [options]
* @param {PinOptions & FetchCompleteDagOptions & AbortOptions} [options]
*/
async pinRecursively (cid, options = {}) {
await this.fetchCompleteDag(cid, options)
Expand Down Expand Up @@ -271,10 +280,10 @@ export class PinManager {

/**
* @param {CID} cid
* @param {AbortOptions} options
* @param {FetchCompleteDagOptions} [options]
*/
async fetchCompleteDag (cid, options) {
const seen = new Set()
async fetchCompleteDag (cid, options = {}) {
const seen = new QuickLRU({ maxSize: options.cidCacheMaxSize ?? CID_CACHE_MAX_SIZE })

/**
* @param {CID} cid
Expand All @@ -285,7 +294,7 @@ export class PinManager {
return
}

seen.add(cid.toString())
seen.set(cid.toString(), true)

const bytes = await this.blockstore.get(cid, options)
const codec = await this.loadCodec(cid.code)
Expand Down
29 changes: 29 additions & 0 deletions packages/ipfs-repo/test/pins-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { CID } from 'multiformats/cid'
import all from 'it-all'
import { PinTypes } from '../src/pin-types.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import Sinon from 'sinon'

/**
* @param {import('@ipld/dag-pb').PBNode} node
Expand Down Expand Up @@ -105,6 +106,34 @@ export default (repo) => {
expect(pins.filter(p => p.cid.toString() === cid.toString()))
.to.have.deep.nested.property('[0].metadata', metadata)
})

it('does not traverse the same linked node twice', async () => {
// @ts-expect-error blockstore property is private
const getSpy = Sinon.spy(repo.pins.blockstore, 'get')

const { cid: childCid, buf: childBuf } = await createDagPbNode()
await repo.blocks.put(childCid, childBuf)

// create a root block with duplicate links to the same block
const { cid: rootCid, buf: rootBuf } = await createDagPbNode({
Links: [{
Name: 'child-1',
Tsize: childBuf.byteLength,
Hash: childCid
}, {
Name: 'child-2',
Tsize: childBuf.byteLength,
Hash: childCid
}]
})
await repo.blocks.put(rootCid, rootBuf)

await repo.pins.pinRecursively(rootCid)

expect(getSpy.callCount).to.equal(2, 'should only have loaded the child block once')
expect(getSpy.getCall(0).args[0]).to.deep.equal(rootCid)
expect(getSpy.getCall(1).args[0]).to.deep.equal(childCid)
})
})

describe('.unpin', () => {
Expand Down

0 comments on commit 336d0b9

Please sign in to comment.