Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: ping (#1299)
Browse files Browse the repository at this point in the history
* feat: implement `ipfs ping` flags #928

* feat: first shot at ping implementaion

* fix: ETOOMANYSTREAMS 😢

* chore: cleanup on the ping component

* chore: ping component linting

* chore: bump js-ipfs-api and fix http ping validation

* chore: add test to ping cli command

* chore: add ping cli test

* chore: refactor ping component and some cleanup

* chore: add tests to the ping http API

* fix: no need to check for peerRouting method in ping

* chore: add tests for ping core functionality
  • Loading branch information
JGAntunes authored and daviddias committed May 8, 2018
1 parent 7bb838f commit 7bc90cc
Show file tree
Hide file tree
Showing 11 changed files with 610 additions and 3 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@
"progress": "^2.0.0",
"promisify-es6": "^1.0.3",
"pull-abortable": "^4.1.1",
"pull-catch": "^1.0.0",
"pull-defer": "^0.2.2",
"pull-file": "^1.1.0",
"pull-ndjson": "^0.1.1",
Expand Down
41 changes: 41 additions & 0 deletions src/cli/commands/ping.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict'

const pull = require('pull-stream/pull')
const drain = require('pull-stream/sinks/drain')
const pullCatch = require('pull-catch')

const print = require('../utils').print

module.exports = {
command: 'ping <peerId>',

description: 'Measure the latency of a connection',

builder: {
count: {
alias: 'n',
type: 'integer',
default: 10
}
},

handler (argv) {
const peerId = argv.peerId
const count = argv.count || 10
pull(
argv.ipfs.pingPullStream(peerId, { count }),
pullCatch(err => {
throw err
}),
drain(({ Time, Text }) => {
// Check if it's a pong
if (Time) {
print(`Pong received: time=${Time} ms`)
// Status response
} else {
print(Text)
}
})
)
}
}
91 changes: 89 additions & 2 deletions src/core/components/ping.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,96 @@
'use strict'

const promisify = require('promisify-es6')
const debug = require('debug')
const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR
const PeerId = require('peer-id')
const pull = require('pull-stream/pull')
const Pushable = require('pull-pushable')
const ndjson = require('pull-ndjson')
const waterfall = require('async/waterfall')

const log = debug('jsipfs:ping')
log.error = debug('jsipfs:ping:error')

module.exports = function ping (self) {
return promisify((callback) => {
callback(new Error('Not implemented'))
return promisify((peerId, count, cb) => {
if (!self.isOnline()) {
return cb(new Error(OFFLINE_ERROR))
}

const source = Pushable()

const response = pull(
source,
ndjson.serialize()
)
waterfall([
getPeer.bind(null, self._libp2pNode, source, peerId),
runPing.bind(null, self._libp2pNode, source, count)
], (err) => {
if (err) {
log.error(err)
source.push(getPacket({Text: err.toString()}))
source.end(err)
}
})

cb(null, response)
})
}

function getPacket (msg) {
// Default msg
const basePacket = {Success: false, Time: 0, Text: ''}
return Object.assign({}, basePacket, msg)
}

function getPeer (libp2pNode, statusStream, peerId, cb) {
let peer
try {
peer = libp2pNode.peerBook.get(peerId)
return cb(null, peer)
} catch (err) {
log('Peer not found in peer book, trying peer routing')
// Share lookup status just as in the go implemmentation
statusStream.push(getPacket({Success: true, Text: `Looking up peer ${peerId}`}))
// Try to use peerRouting
libp2pNode.peerRouting.findPeer(PeerId.createFromB58String(peerId), cb)
}
}

function runPing (libp2pNode, statusStream, count, peer, cb) {
libp2pNode.ping(peer, (err, p) => {
log('Got peer', peer)
if (err) {
return cb(err)
}

let packetCount = 0
let totalTime = 0
statusStream.push(getPacket({Success: true, Text: `PING ${peer.id.toB58String()}`}))

p.on('ping', (time) => {
statusStream.push(getPacket({ Success: true, Time: time }))
totalTime += time
packetCount++
if (packetCount >= count) {
const average = totalTime / count
p.stop()
statusStream.push(getPacket({ Success: true, Text: `Average latency: ${average}ms` }))
statusStream.end()
}
})

p.on('error', (err) => {
log.error(err)
p.stop()
statusStream.push(getPacket({Text: err.toString()}))
statusStream.end(err)
})

p.start()

return cb()
})
}
1 change: 1 addition & 0 deletions src/http/api/resources/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
exports.version = require('./version')
exports.shutdown = require('./shutdown')
exports.id = require('./id')
exports.ping = require('./ping')
exports.bootstrap = require('./bootstrap')
exports.repo = require('./repo')
exports.object = require('./object')
Expand Down
38 changes: 38 additions & 0 deletions src/http/api/resources/ping.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict'

const Joi = require('joi')
const boom = require('boom')
const toStream = require('pull-stream-to-stream')
const PassThrough = require('readable-stream').PassThrough
const pump = require('pump')

exports = module.exports

exports.get = {
validate: {
query: Joi.object().keys({
n: Joi.number().greater(0),
count: Joi.number().greater(0),
arg: Joi.string().required()
}).xor('n', 'count').unknown()
},
handler: (request, reply) => {
const ipfs = request.server.app.ipfs
const peerId = request.query.arg
// Default count to 10
const count = request.query.n || request.query.count || 10
ipfs.ping(peerId, count, (err, pullStream) => {
if (err) {
return reply(boom.badRequest(err))
}
// Streams from pull-stream-to-stream don't seem to be compatible
// with the stream2 readable interface
// see: https://github.com/hapijs/hapi/blob/c23070a3de1b328876d5e64e679a147fafb04b38/lib/response.js#L533
// and: https://github.com/pull-stream/pull-stream-to-stream/blob/e436acee18b71af8e71d1b5d32eee642351517c7/index.js#L28
const responseStream = toStream.source(pullStream)
const stream2 = new PassThrough()
pump(responseStream, stream2)
return reply(stream2).type('application/json').header('X-Chunked-Output', '1')
})
}
}
1 change: 1 addition & 0 deletions src/http/api/routes/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module.exports = (server) => {
require('./object')(server)
require('./repo')(server)
require('./config')(server)
require('./ping')(server)
require('./swarm')(server)
require('./bitswap')(server)
require('./file')(server)
Expand Down
16 changes: 16 additions & 0 deletions src/http/api/routes/ping.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
'use strict'

const resources = require('./../resources')

module.exports = (server) => {
const api = server.select('API')

api.route({
method: '*',
path: '/api/v0/ping',
config: {
handler: resources.ping.get.handler,
validate: resources.ping.get.validate
}
})
}
3 changes: 2 additions & 1 deletion test/cli/commands.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
const expect = require('chai').expect
const runOnAndOff = require('../utils/on-and-off')

const commandCount = 73
const commandCount = 74

describe('commands', () => runOnAndOff((thing) => {
let ipfs

Expand Down
149 changes: 149 additions & 0 deletions test/cli/ping.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/* eslint max-nested-callbacks: ["error", 8] */
/* eslint-env mocha */
'use strict'

const chai = require('chai')
const dirtyChai = require('dirty-chai')
const series = require('async/series')
const DaemonFactory = require('ipfsd-ctl')
const ipfsExec = require('../utils/ipfs-exec')

const df = DaemonFactory.create({ type: 'js' })
const expect = chai.expect
chai.use(dirtyChai)

const config = {
Bootstrap: [],
Discovery: {
MDNS: {
Enabled:
false
}
}
}

describe('ping', function () {
this.timeout(60 * 1000)
let ipfsdA
let ipfsdB
let bMultiaddr
let ipfsdBId
let cli

before((done) => {
this.timeout(60 * 1000)
series([
(cb) => {
df.spawn({
exec: `./src/cli/bin.js`,
config,
initOptions: { bits: 512 }
}, (err, _ipfsd) => {
expect(err).to.not.exist()
ipfsdB = _ipfsd
cb()
})
},
(cb) => {
ipfsdB.api.id((err, peerInfo) => {
expect(err).to.not.exist()
ipfsdBId = peerInfo.id
bMultiaddr = peerInfo.addresses[0]
cb()
})
}
], done)
})

before(function (done) {
this.timeout(60 * 1000)

df.spawn({
exec: './src/cli/bin.js',
config,
initoptions: { bits: 512 }
}, (err, _ipfsd) => {
expect(err).to.not.exist()
ipfsdA = _ipfsd
// Without DHT we need to have an already established connection
ipfsdA.api.swarm.connect(bMultiaddr, done)
})
})

before((done) => {
this.timeout(60 * 1000)
cli = ipfsExec(ipfsdA.repoPath)
done()
})

after((done) => ipfsdA.stop(done))
after((done) => ipfsdB.stop(done))

it('ping host', (done) => {
this.timeout(60 * 1000)
const ping = cli(`ping ${ipfsdBId}`)
const result = []
ping.stdout.on('data', (output) => {
const packets = output.toString().split('\n').slice(0, -1)
result.push(...packets)
})

ping.stdout.on('end', () => {
expect(result).to.have.lengthOf(12)
expect(result[0]).to.equal(`PING ${ipfsdBId}`)
for (let i = 1; i < 11; i++) {
expect(result[i]).to.match(/^Pong received: time=\d+ ms$/)
}
expect(result[11]).to.match(/^Average latency: \d+(.\d+)?ms$/)
done()
})

ping.catch((err) => {
expect(err).to.not.exist()
})
})

it('ping host with --n option', (done) => {
this.timeout(60 * 1000)
const ping = cli(`ping --n 1 ${ipfsdBId}`)
const result = []
ping.stdout.on('data', (output) => {
const packets = output.toString().split('\n').slice(0, -1)
result.push(...packets)
})

ping.stdout.on('end', () => {
expect(result).to.have.lengthOf(3)
expect(result[0]).to.equal(`PING ${ipfsdBId}`)
expect(result[1]).to.match(/^Pong received: time=\d+ ms$/)
expect(result[2]).to.match(/^Average latency: \d+(.\d+)?ms$/)
done()
})

ping.catch((err) => {
expect(err).to.not.exist()
})
})

it('ping host with --count option', (done) => {
this.timeout(60 * 1000)
const ping = cli(`ping --count 1 ${ipfsdBId}`)
const result = []
ping.stdout.on('data', (output) => {
const packets = output.toString().split('\n').slice(0, -1)
result.push(...packets)
})

ping.stdout.on('end', () => {
expect(result).to.have.lengthOf(3)
expect(result[0]).to.equal(`PING ${ipfsdBId}`)
expect(result[1]).to.match(/^Pong received: time=\d+ ms$/)
expect(result[2]).to.match(/^Average latency: \d+(.\d+)?ms$/)
done()
})

ping.catch((err) => {
expect(err).to.not.exist()
})
})
})
Loading

0 comments on commit 7bc90cc

Please sign in to comment.