Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

refactor: convert dht API to async/await #1156

Merged
merged 10 commits into from
Nov 14, 2019
Merged
39 changes: 39 additions & 0 deletions src/dht/find-peer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict'

const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const ndjson = require('iterable-ndjson')
const log = require('debug')('ipfs-http-client:dht:find-peer')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')

module.exports = configure(({ ky }) => {
return (peerId, options) => (async function * () {
options = options || {}

const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${peerId}`)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

const res = await ky.get('dht/findpeer', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

for await (const message of ndjson(toIterable(res.body))) {
log(message)
// 2 = FinalPeer
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L18
if (message.Type === 2 && message.Responses) {
for (const { ID, Addrs } of message.Responses) {
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
yield peerInfo
}
}
}
})()
})
40 changes: 40 additions & 0 deletions src/dht/find-provs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict'

const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const ndjson = require('iterable-ndjson')
const log = require('debug')('ipfs-http-client:dht:find-provs')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')

module.exports = configure(({ ky }) => {
return (cid, options) => (async function * () {
options = options || {}

const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${cid}`)
if (options.numProviders) searchParams.set('num-providers', options.numProviders)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

const res = await ky.get('dht/findprovs', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

for await (const message of ndjson(toIterable(res.body))) {
log(message)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to log the message?

Here and in all the other DHT related calls?

// 4 = Provider
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L20
if (message.Type === 4 && message.Responses) {
for (const { ID, Addrs } of message.Responses) {
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
yield peerInfo
}
}
}
})()
})
63 changes: 0 additions & 63 deletions src/dht/findpeer.js

This file was deleted.

63 changes: 0 additions & 63 deletions src/dht/findprovs.js

This file was deleted.

64 changes: 24 additions & 40 deletions src/dht/get.js
Original file line number Diff line number Diff line change
@@ -1,48 +1,32 @@
'use strict'

const promisify = require('promisify-es6')
const ndjson = require('iterable-ndjson')
const log = require('debug')('ipfs-http-client:dht:get')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')

module.exports = (send) => {
return promisify((key, opts, callback) => {
if (typeof opts === 'function' && !callback) {
callback = opts
opts = {}
}

// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' && typeof callback === 'function') {
callback = opts
opts = {}
}
module.exports = configure(({ ky }) => {
return (key, options) => (async function * () {
options = options || {}

function handleResult (done, err, res) {
if (err) {
return done(err)
}
if (!res) {
return done(new Error('empty response'))
}
if (res.length === 0) {
return done(new Error('no value returned for key'))
}
const searchParams = new URLSearchParams(options.searchParams)
searchParams.set('arg', `${key}`)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

// Inconsistent return values in the browser vs node
if (Array.isArray(res)) {
res = res[0]
}
const res = await ky.get('dht/get', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

if (res.Type === 5) {
done(null, res.Extra)
} else {
done(new Error('key was not found (type 6)'))
for await (const message of ndjson(toIterable(res.body))) {
log(message)
// 5 = Value
// https://github.com/libp2p/go-libp2p-core/blob/6e566d10f4a5447317a66d64c7459954b969bdab/routing/query.go#L21
if (message.Type === 5) {
yield message.Extra
}
}

send({
path: 'dht/get',
args: key,
qs: opts
}, handleResult.bind(null, callback))
})
}
})()
})
31 changes: 22 additions & 9 deletions src/dht/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
'use strict'

const moduleConfig = require('../utils/module-config')
const callbackify = require('callbackify')
const errCode = require('err-code')
const { collectify } = require('../lib/converters')

module.exports = (arg) => {
const send = moduleConfig(arg)
module.exports = config => {
const get = require('./get')(config)
const findPeer = require('./find-peer')(config)

return {
get: require('./get')(send),
put: require('./put')(send),
findProvs: require('./findprovs')(send),
findPeer: require('./findpeer')(send),
provide: require('./provide')(send),
get: callbackify.variadic(async (key, options) => {
for await (const value of get(key, options)) {
return value
}
throw errCode(new Error('value not found'), 'ERR_TYPE_5_NOT_FOUND')
}),
put: callbackify.variadic(collectify(require('./put')(config))),
findProvs: callbackify.variadic(collectify(require('./find-provs')(config))),
findPeer: callbackify.variadic(async (peerId, options) => {
for await (const peerInfo of findPeer(peerId, options)) {
return peerInfo
}
throw errCode(new Error('final peer not found'), 'ERR_TYPE_2_NOT_FOUND')
}),
provide: callbackify.variadic(collectify(require('./provide')(config))),
// find closest peerId to given peerId
query: require('./query')(send)
query: callbackify.variadic(collectify(require('./query')(config)))
}
}
65 changes: 35 additions & 30 deletions src/dht/provide.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,42 @@
'use strict'

const promisify = require('promisify-es6')
const CID = require('cids')
const PeerId = require('peer-id')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const ndjson = require('iterable-ndjson')
const log = require('debug')('ipfs-http-client:dht:provide')
const configure = require('../lib/configure')
const toIterable = require('../lib/stream-to-iterable')
const toCamel = require('../lib/object-to-camel')

module.exports = (send) => {
return promisify((cids, opts, callback) => {
if (typeof opts === 'function' && !callback) {
callback = opts
opts = {}
}
module.exports = configure(({ ky }) => {
return (cids, options) => (async function * () {
cids = Array.isArray(cids) ? cids : [cids]
options = options || {}

// opts is the real callback --
// 'callback' is being injected by promisify
if (typeof opts === 'function' && typeof callback === 'function') {
callback = opts
opts = {}
}
const searchParams = new URLSearchParams(options.searchParams)
cids.forEach(cid => searchParams.append('arg', `${cid}`))
if (options.recursive != null) searchParams.set('recursive', options.recursive)
if (options.verbose != null) searchParams.set('verbose', options.verbose)

if (!Array.isArray(cids)) {
cids = [cids]
}
const res = await ky.get('dht/provide', {
timeout: options.timeout,
signal: options.signal,
headers: options.headers,
searchParams
})

// Validate CID(s) and serialize
try {
cids = cids.map(cid => new CID(cid).toBaseEncodedString('base58btc'))
} catch (err) {
return callback(err)
for await (let message of ndjson(toIterable(res.body))) {
log(message)
message = toCamel(message)
if (message.responses) {
message.responses = message.responses.map(({ ID, Addrs }) => {
const peerInfo = new PeerInfo(PeerId.createFromB58String(ID))
if (Addrs) Addrs.forEach(a => peerInfo.multiaddrs.add(multiaddr(a)))
return peerInfo
})
}
yield message
}

send({
path: 'dht/provide',
args: cids,
qs: opts
}, callback)
})
}
})()
})
Loading