Skip to content

Commit

Permalink
feat: make blockstore identity-hash compatible (#297)
Browse files Browse the repository at this point in the history
- Separate out typing for the blockstore interface
- Implement `idstore` based on https://github.com/ipfs/go-ipfs-blockstore/blob/master/idstore.go - this leaves the original blockstore implementation unchanged but adds a composable layer applied on top to add compatibility with identity hashes (served as the same blockstore interface)
- add `idstore` in construction of `repo.blocks`

fix ipfs/js-ipfs#3289
  • Loading branch information
hannahhoward authored Apr 14, 2021
1 parent a36e695 commit bbcdb12
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 65 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@
"ipfs-repo-migrations": "^7.0.1",
"ipfs-utils": "^6.0.0",
"ipld-block": "^0.11.0",
"it-filter": "^1.0.2",
"it-map": "^1.0.2",
"it-pushable": "^1.4.0",
"just-safe-get": "^2.0.0",
"just-safe-set": "^2.1.0",
"merge-options": "^3.0.4",
"multibase": "^4.0.1",
"multihashes": "^4.0.2",
"p-queue": "^6.0.0",
"proper-lockfile": "^4.0.0",
"sort-keys": "^4.0.0",
Expand Down
61 changes: 3 additions & 58 deletions src/blockstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const pushable = require('it-pushable')
* @typedef {import("interface-datastore").Datastore} Datastore
* @typedef {import("interface-datastore").Options} DatastoreOptions
* @typedef {import("cids")} CID
* @typedef {import('./types').Blockstore} Blockstore
*/

/**
Expand All @@ -36,19 +37,14 @@ function maybeWithSharding (filestore, options) {

/**
* @param {Datastore | ShardingDatastore} store
* @returns {Blockstore}
*/
function createBaseStore (store) {
return {
open () {
return store.open()
},
/**
* Query the store
*
* @param {Query} query
* @param {DatastoreOptions} [options]
* @returns {AsyncIterable<Block|CID>}
*/

async * query (query, options) {
for await (const { key, value } of store.query(query, options)) {
// TODO: we should make this a different method
Expand All @@ -61,40 +57,19 @@ function createBaseStore (store) {
}
},

/**
* Get a single block by CID
*
* @param {CID} cid
* @param {DatastoreOptions} [options]
* @returns {Promise<Block>}
*/
async get (cid, options) {
const key = cidToKey(cid)
const blockData = await store.get(key, options)

return new Block(blockData, cid)
},

/**
* Like get, but for more
*
* @param {Iterable<CID> | AsyncIterable<CID>} cids
* @param {DatastoreOptions} [options]
* @returns {AsyncIterable<Block>}
*/
async * getMany (cids, options) {
for await (const cid of cids) {
yield this.get(cid, options)
}
},

/**
* Write a single block to the store
*
* @param {Block} block
* @param {DatastoreOptions} [options]
* @returns {Promise<Block>}
*/
async put (block, options) {
if (!Block.isBlock(block)) {
throw new Error('invalid block')
Expand All @@ -110,13 +85,6 @@ function createBaseStore (store) {
return block
},

/**
* Like put, but for more
*
* @param {AsyncIterable<Block>|Iterable<Block>} blocks
* @param {DatastoreOptions} [options]
* @returns {AsyncIterable<Block>}
*/
async * putMany (blocks, options) { // eslint-disable-line require-await
// we cannot simply chain to `store.putMany` because we convert a CID into
// a key based on the multihash only, so we lose the version & codec and
Expand Down Expand Up @@ -158,41 +126,18 @@ function createBaseStore (store) {
yield * output
},

/**
* Does the store contain block with this CID?
*
* @param {CID} cid
* @param {DatastoreOptions} [options]
*/
has (cid, options) {
return store.has(cidToKey(cid), options)
},

/**
* Delete a block from the store
*
* @param {CID} cid
* @param {DatastoreOptions} [options]
* @returns {Promise<void>}
*/
delete (cid, options) {
return store.delete(cidToKey(cid), options)
},

/**
* Delete a block from the store
*
* @param {AsyncIterable<any> | Iterable<any>} cids
* @param {DatastoreOptions} [options]
*/
deleteMany (cids, options) {
return store.deleteMany(map(cids, cid => cidToKey(cid)), options)
},

/**
* Close the store
*
*/
close () {
return store.close()
}
Expand Down
4 changes: 3 additions & 1 deletion src/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ module.exports = (store) => {
const value = m.value
if (key) {
const config = await configStore.get()
_set(config, key, value)
if (typeof config === 'object' && config !== null) {
_set(config, key, value)
}
return _saveAll(config)
}
return _saveAll(value)
Expand Down
138 changes: 138 additions & 0 deletions src/idstore.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
'use strict'

const Block = require('ipld-block')
const filter = require('it-filter')
const mh = require('multihashes')
const pushable = require('it-pushable')
const drain = require('it-drain')
const CID = require('cids')
const errcode = require('err-code')

/**
* @typedef {import("interface-datastore").Query} Query
* @typedef {import("interface-datastore").Datastore} Datastore
* @typedef {import("interface-datastore").Options} DatastoreOptions
* @typedef {import('./types').Blockstore} Blockstore
*/

/**
*
* @param {Blockstore} blockstore
*/
module.exports = createIdStore

/**
* @param {Blockstore} store
* @returns {Blockstore}
*/
function createIdStore (store) {
return {
open () {
return store.open()
},
query (query, options) {
return store.query(query, options)
},

async get (cid, options) {
const extracted = extractContents(cid)
if (extracted.isIdentity) {
return Promise.resolve(new Block(extracted.digest, cid))
}
return store.get(cid, options)
},

async * getMany (cids, options) {
for await (const cid of cids) {
yield this.get(cid, options)
}
},

async put (block, options) {
const { isIdentity } = extractContents(block.cid)
if (isIdentity) {
return Promise.resolve(block)
}
return store.put(block, options)
},

async * putMany (blocks, options) {
// in order to return all blocks. we're going to assemble a seperate iterable
// return rather than return the resolves of store.putMany using the same
// process used by blockstore.putMany
const output = pushable()

// process.nextTick runs on the microtask queue, setImmediate runs on the next
// event loop iteration so is slower. Use process.nextTick if it is available.
const runner = process && process.nextTick ? process.nextTick : setImmediate

runner(async () => {
try {
await drain(store.putMany(async function * () {
for await (const block of blocks) {
if (!extractContents(block.cid).isIdentity) {
yield block
}
// if non identity blocks successfully write, blocks are included in output
output.push(block)
}
}()))

output.end()
} catch (err) {
output.end(err)
}
})

yield * output
},

has (cid, options) {
const { isIdentity } = extractContents(cid)
if (isIdentity) {
return Promise.resolve(true)
}
return store.has(cid, options)
},

delete (cid, options) {
const { isIdentity } = extractContents(cid)
if (isIdentity) {
return Promise.resolve()
}
return store.delete(cid, options)
},

deleteMany (cids, options) {
return store.deleteMany(filter(cids, (cid) => !extractContents(cid).isIdentity), options)
},

close () {
return store.close()
}
}
}

/**
* @param {CID} k
* @returns {{ isIdentity: false } | { isIdentity: true, digest: Uint8Array}}
*/
function extractContents (k) {
if (!CID.isCID(k)) {
throw errcode(new Error('Not a valid cid'), 'ERR_INVALID_CID')
}

// Pre-check by calling Prefix(), this much faster than extracting the hash.
const decoded = mh.decode(k.multihash)

if (decoded.name !== 'identity') {
return {
isIdentity: false
}
}

return {
isIdentity: true,
digest: decoded.digest
}
}
9 changes: 5 additions & 4 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const config = require('./config')
const spec = require('./spec')
const apiAddr = require('./api-addr')
const blockstore = require('./blockstore')
const idstore = require('./idstore')
const defaultOptions = require('./default-options')
const defaultDatastore = require('./default-datastore')
const ERRORS = require('./errors')
Expand Down Expand Up @@ -66,8 +67,8 @@ class IpfsRepo {
this.keys = backends.create('keys', pathJoin(this.path, 'keys'), this.options)
this.pins = backends.create('pins', pathJoin(this.path, 'pins'), this.options)
const blocksBaseStore = backends.create('blocks', pathJoin(this.path, 'blocks'), this.options)
this.blocks = blockstore(blocksBaseStore, this.options.storageBackendOptions.blocks)

const blockStore = blockstore(blocksBaseStore, this.options.storageBackendOptions.blocks)
this.blocks = idstore(blockStore)
this.version = version(this.root)
this.config = config(this.root)
this.spec = spec(this.root)
Expand Down Expand Up @@ -437,7 +438,7 @@ module.exports.errors = ERRORS
* @param {any} _config
*/
function buildConfig (_config) {
_config.datastore = Object.assign({}, defaultDatastore, _get(_config, 'datastore', {}))
_config.datastore = Object.assign({}, defaultDatastore, _get(_config, 'datastore'))

return _config
}
Expand All @@ -449,7 +450,7 @@ function buildDatastoreSpec (_config) {
/** @type { {type: string, mounts: Array<{mountpoint: string, type: string, prefix: string, child: {type: string, path: 'string', sync: boolean, shardFunc: string}}>}} */
const spec = {
...defaultDatastore.Spec,
..._get(_config, 'datastore.Spec', {})
..._get(_config, 'datastore.Spec')
}

return {
Expand Down
57 changes: 56 additions & 1 deletion src/types.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { Datastore } from 'interface-datastore'
import type { Datastore, Options as DatastoreOptions, Query } from 'interface-datastore'
import type { BigNumber } from 'bignumber.js'

import type CID from 'cids'

export type AwaitIterable<T> = Iterable<T> | AsyncIterable<T>
export type Await<T> = Promise<T> | T

Expand Down Expand Up @@ -56,3 +58,56 @@ export interface Stat {
numObjects: BigNumber
repoSize: BigNumber
}

export interface Block {
cid: CID
data: Uint8Array
}

export interface Blockstore {
open: () => Promise<Void>
/**
* Query the store
*/
query: (query: Query, options?: DatastoreOptions) => AsyncIterable<Block|CID>

/**
* Get a single block by CID
*/
get: (cid: CID, options?: DatastoreOptions) => Promise<Block>

/**
* Like get, but for more
*/
getMany: (cids: AwaitIterable<CID>, options?: DatastoreOptions) => AsyncIterable<Block>

/**
* Write a single block to the store
*/
put: (block: Block, options?: DatastoreOptions) => Promise<Block>

/**
* Like put, but for more
*/
putMany: (blocks: AwaitIterable<Block>, options?: DatastoreOptions) => AsyncIterable<Block>

/**
* Does the store contain block with this CID?
*/
has: (cid: CID, options?: DatastoreOptions) => Promise<boolean>

/**
* Delete a block from the store
*/
delete: (cid: CID, options?: DatastoreOptions) => Promise<Void>

/**
* Delete a block from the store
*/
deleteMany: (cids: AwaitIterable<any>, options?: DatastoreOptions) => AsyncIterable<Key>

/**
* Close the store
*/
close: () => Promise<Void>
}
Loading

0 comments on commit bbcdb12

Please sign in to comment.