From e9f991716f09f19a3e9de674a84338d6bff419a1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 2 Apr 2018 22:57:45 -0600 Subject: [PATCH] feat: improve performance --- package.json | 6 +- profile.js | 89 ++++++++++++++++++++ src/channel.js | 111 ++++++++----------------- src/index.js | 129 +++++++++++++++-------------- src/utils.js | 55 ++++++------- test/channel.spec.js | 188 +++++++++++++++++++++---------------------- test/mplex.spec.js | 82 ------------------- test/plex.spec.js | 92 +++++++++++++++++++++ test/utils.spec.js | 37 +++++++++ 9 files changed, 442 insertions(+), 347 deletions(-) create mode 100644 profile.js delete mode 100644 test/mplex.spec.js create mode 100644 test/plex.spec.js create mode 100644 test/utils.spec.js diff --git a/package.json b/package.json index 7856750..228d7c9 100644 --- a/package.json +++ b/package.json @@ -19,14 +19,16 @@ "license": "MIT", "devDependencies": { "aegir": "^13.0.6", - "pull-abortable": "^4.1.1" + "async": "^2.6.0", + "chai-checkmark": "^1.0.1", + "pull-abortable": "^4.1.1", + "pull-generate": "^2.2.0" }, "repository": { "type": "git", "url": "git+https://github.com/dryajov/pull-plex.git" }, "dependencies": { - "async": "^2.6.0", "chai": "^4.1.2", "debug": "^3.1.0", "dirty-chai": "^2.0.1", diff --git a/profile.js b/profile.js new file mode 100644 index 0000000..d408853 --- /dev/null +++ b/profile.js @@ -0,0 +1,89 @@ +'use strict' + +const pair = require('pull-pair/duplex') +const pull = require('pull-stream') +const generate = require('pull-generate') +const each = require('async/each') +const eachLimit = require('async/eachLimit') +const setImmediate = require('async/setImmediate') + +const Plex = require('./src') + +const spawn = (nStreams, nMsg, done, limit) => { + const p = pair() + + const check = marker(2 * nStreams, done) + + const msg = 'simple msg' + + const listener = new Plex(false) + const dialer = new Plex(true) + + pull(dialer, p[0], dialer) + pull(listener, p[1], listener) + + listener.on('stream', (stream) => { + pull( + stream, + pull.onEnd((err) => { + if (err) { return done(err) } + check() + pull(pull.empty(), stream) + }) + ) + }) + + const numbers = [] + for (let i = 0; i < nStreams; i++) { + numbers.push(i) + } + + const spawnStream = (n, cb) => { + const stream = dialer.createStream() + pull( + generate(0, (s, cb) => { + setImmediate(() => { + cb(s === nMsg ? true : null, msg, s + 1) + }) + }), + stream, + pull.collect((err) => { + if (err) { return done(err) } + check() + cb() + }) + ) + } + + if (limit) { + eachLimit(numbers, limit, spawnStream, () => {}) + } else { + each(numbers, spawnStream, () => {}) + } +} + +function marker (n, done) { + let i = 0 + return (err) => { + i++ + + // console.log(`${i} out of ${n} interactions`) + if (err) { + console.error('Failed after %s iterations', i) + return done(err) + } + + if (i === n) { + done() + } + } +} + + +spawn(1000, 1000, (err) => { + if (err) { + throw err + } + console.log('Done') + process.exit(0) +}, 50000) diff --git a/src/channel.js b/src/channel.js index f8831df..bd9c078 100644 --- a/src/channel.js +++ b/src/channel.js @@ -3,7 +3,6 @@ const pushable = require('pull-pushable') const consts = require('./consts') -const utils = require('./utils') const EE = require('events') const debug = require('debug') @@ -41,9 +40,9 @@ class Channel extends EE { this._msgs = pushable((err) => { this._log('source closed', err) if (this._reset) { return } // don't try closing the channel on reset - this.endChan((err) => { - if (err) { setImmediate(() => this.emit('error', err)) } - }) + + this.endChan() + if (err) { this.emit('error', err) } }) this._source = this._msgs @@ -59,28 +58,20 @@ class Channel extends EE { // source ended, close the stream if (end === true) { - this.endChan((err) => { - if (err) { - log.err(err) - setImmediate(() => this.emit('error', err)) - } - }) - return + return this.endChan() } // source errored, reset stream if (end || this._reset) { - this.resetChan(() => { - setImmediate(() => this.emit('error', end || this._reset)) - this.reset() - }) + this.resetChan() + this.emit('error', end || this._reset) + this.reset() return } // just send - return this.sendMsg(data, (err) => { - read(err, next) - }) + this.sendMsg(data) + return read(null, next) } read(null, next) @@ -126,97 +117,63 @@ class Channel extends EE { this.close(this._reset) } - openChan (cb) { + openChan () { this._log('openChan') - this.open = true // avoid duplicate open msgs - utils.encodeMsg(this._id, - consts.NEW, - this._name, - (err, data) => { - if (err) { - log.err(err) - this.open = false - return cb(err) - } - - this._plex.push(data) - cb(null, this) - }) + this.open = true + this._plex.push([ + this._id, + consts.type.NEW, + this._name + ]) } - sendMsg (data, cb) { + sendMsg (data) { this._log('sendMsg', data) if (!this.open) { - return this.openChan((err) => { - if (err) { - log.err(err) - return cb(err) - } - - this.sendMsg(data, cb) - }) + this.openChan() } - utils.encodeMsg(this._id, + this._plex.push([ + this._id, this._initiator ? consts.type.OUT_MESSAGE : consts.type.IN_MESSAGE, - data, - (err, data) => { - if (err) { - log.err(err) - return cb(err) - } - - this._plex.push(data) - cb() - }) + data + ]) } - endChan (cb) { + endChan () { this._log('endChan') if (!this.open) { - return cb() + return } - utils.encodeMsg(this._id, + this._plex.push([ + this._id, this._initiator ? consts.type.OUT_CLOSE : consts.type.IN_CLOSE, - '', - (err, data) => { - if (err) { - log.err(err) - return cb(err) - } - this._plex.push(data) - cb() - }) + '' + ]) } - resetChan (cb) { + resetChan () { this._log('endChan') if (!this.open) { - return cb() + return } - utils.encodeMsg(this._id, + this._plex.push([ + this._id, this._initiator ? consts.type.OUT_RESET : consts.type.IN_RESET, - '', - (err, data) => { - if (err) { - log.err(err) - return cb(err) - } - this._plex.push(data) - cb() - }) + '' + ]) } } diff --git a/src/index.js b/src/index.js index 5ab361e..5d750d8 100644 --- a/src/index.js +++ b/src/index.js @@ -1,5 +1,6 @@ 'use strict' +const pull = require('pull-stream') const pushable = require('pull-pushable') const EE = require('events') @@ -13,11 +14,11 @@ const debug = require('debug') const log = debug('pull-plex') log.err = debug('pull-plex:err') -class Mplex extends EE { +class Plex extends EE { constructor (initiator, onChan) { super() - this._initiator = initiator || true - this._chanId = 1 + this._initiator = !!initiator + this._chanId = this._initiator ? 1 : 0 this._channels = {} this._log = (name, data) => { @@ -41,19 +42,31 @@ class Mplex extends EE { this.on('stream', (chan) => onChan(chan, chan.id)) } - this.source = this._chandata + this.source = pull( + this._chandata, + utils.encode() + ) + + this.sink = pull( + utils.decode(), + (read) => { + const next = (end, data) => { + if (end === true) { return } + if (end) { return this.destroy(end) } + this._handle(data) + return read(null, next) + } - this.sink = (read) => { - const next = (end, data) => { - if (end === true) { return } - if (end) { return this.destroy(end) } - return this._handle(data, (err) => { - read(err, next) - }) - } + read(null, next) + }) + } - read(null, next) - } + get initiator () { + return this._initiator + } + + get initiator () { + return this._initiator } destroy (err) { @@ -62,7 +75,7 @@ class Mplex extends EE { .keys(this._channels) .forEach((id) => { const chan = this._channels[id] - chan.close(err) + chan.reset(err) delete this._channels[id] }) @@ -77,12 +90,8 @@ class Mplex extends EE { this._chandata.push(data) } - nextChanId (initiator) { - let inc = 1 - if (initiator) { inc = 1 } - this._chanId += inc + 1 - - return this._chanId + _nextChanId () { + return this._chanId += 2 } createStream (name) { @@ -101,7 +110,7 @@ class Mplex extends EE { open = false } - id = id || this.nextChanId(initiator) + id = id || this._nextChanId(initiator) const chan = new Channel(id, name || id.toString(), this, @@ -112,56 +121,56 @@ class Mplex extends EE { delete this._channels[id] }) + if (this._channels[id]) { + return this.emit('error', new Error(`channel with id ${id} already exist!`)) + } + this._channels[id] = chan return chan } - _handle (msg, cb) { - utils.decodeMsg(msg, (err, _data) => { - if (err) { return cb(err) } - const { id, type } = _data[0] - const data = _data[1] - switch (type) { - case consts.type.NEW: { - if (!this._initiator && (id & 1) !== 1) { - return this.emit('error', - new Error(`Initiator can't have even id's!`)) - } - - const chan = this._newStream(id, this._initiator, true, data.toString()) - setImmediate(() => this.emit('stream', chan)) - return cb() + _handle (msg) { + const { id, type, data } = msg + switch (type) { + case consts.type.NEW: { + if (!this._initiator && (id & 1) !== 1) { + return this.emit('error', + new Error(`Initiator can't have even id's!`)) } - case consts.type.OUT_MESSAGE: - case consts.type.IN_MESSAGE: { - const chan = this._channels[id] - if (chan) { - chan.push(data) - } - return cb() + const chan = this._newStream(id, this._initiator, true, data.toString()) + setImmediate(() => this.emit('stream', chan)) + return + } + + case consts.type.OUT_MESSAGE: + case consts.type.IN_MESSAGE: { + const chan = this._channels[id] + if (chan) { + chan.push(data) } + return + } - case consts.type.OUT_CLOSE: - case consts.type.IN_CLOSE: { - const chan = this._channels[id] - if (chan) { - chan.close() - } - return cb() + case consts.type.OUT_CLOSE: + case consts.type.IN_CLOSE: { + const chan = this._channels[id] + if (chan) { + chan.close() } + return + } - case consts.type.OUT_RESET: - case consts.type.IN_RESET: { - const chan = this._channels[id] - if (chan) { - chan.reset() - } - return cb() + case consts.type.OUT_RESET: + case consts.type.IN_RESET: { + const chan = this._channels[id] + if (chan) { + chan.reset() } + return } - }) + } } } -module.exports = Mplex +module.exports = Plex diff --git a/src/utils.js b/src/utils.js index 7a187ee..368e421 100644 --- a/src/utils.js +++ b/src/utils.js @@ -6,37 +6,30 @@ const lp = require('pull-length-prefixed') const cat = require('pull-cat') const through = require('pull-through') -exports.encodeMsg = (id, type, data, cb) => { - return pull( - cat([ - pull.values([varint.encode(id << 3 | type)]), - pull( - pull.values([Buffer.from(data)]), - lp.encode() - ) - ]), - pull.flatten(), - pull.collect((err, data) => { - if (err) { return cb(err) } - cb(null, Buffer.from(data)) - }) - ) +exports.encode = () => { + return through(function (msg) { + const data = Buffer.concat([ + Buffer.from(varint.encode(msg[0] << 3 | msg[1])), + Buffer.from(varint.encode(Buffer.byteLength(msg[2]))), + Buffer.from(msg[2]) + ]) + this.queue(data) + }) } -exports.decodeMsg = (msg, cb) => { - let h = null - return pull( - pull.values([msg]), - through(function (buf) { - const header = varint.decode(buf) - h = { id: header >> 3, type: header & 7 } - this.queue(buf.slice(varint.decode.bytes)) - this.queue(null) - }), - lp.decode(), - pull.collect((err, data) => { - if (err) { return cb(err) } - cb(null, [h, data[0]]) - }) - ) +exports.decode = () => { + return through(function (msg) { + let offset = 0 + const h = varint.decode(msg) + offset += varint.decode.bytes + const length = varint.decode(msg.slice(offset)) + offset += varint.decode.bytes + const decoded = { + id: h >> 3, + type: h & 7, + data: msg.slice(offset /*, length*/) // somehow length gets offset and truncates the buffer + } + + this.queue(decoded) + }) } diff --git a/test/channel.spec.js b/test/channel.spec.js index 667ad93..2797757 100644 --- a/test/channel.spec.js +++ b/test/channel.spec.js @@ -5,6 +5,7 @@ const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect +chai.use(require('chai-checkmark')) chai.use(dirtyChai) const pull = require('pull-stream') @@ -12,18 +13,28 @@ const pair = require('pull-pair/duplex') const pushable = require('pull-pushable') const abortable = require('pull-abortable') -const Mplex = require('../src') +const Plex = require('../src') const utils = require('../src/utils') const consts = require('../src/consts') const series = require('async/series') +function closeAndWait (stream) { + pull( + pull.empty(), + stream, + pull.onEnd((err) => { + expect(err).to.not.exist.mark() + }) + ) +} + describe('channel', () => { - it('initiator should be able to send data between two multiplexers', (done) => { + it('initiator should be able to send data', (done) => { const p = pair() - const plex1 = new Mplex(true) - const plex2 = new Mplex(false) + const plex1 = new Plex(true) + const plex2 = new Plex(false) pull(plex1, p[0], plex1) pull(plex2, p[1], plex2) @@ -39,18 +50,18 @@ describe('channel', () => { ) }) - const stream = plex1._newStream(plex1.nextChanId(true), true, 'stream 1') + const stream = plex1.createStream('stream 1') pull( pull.values([Buffer.from('hello from plex1!!')]), stream ) }) - it('receiver should be able to send data between two multiplexers', (done) => { + it('receiver should be able to send data', (done) => { const p = pair() - const plex1 = new Mplex(true) - const plex2 = new Mplex(false) + const plex1 = new Plex(true) + const plex2 = new Plex(false) pull(plex1, p[0], plex1) pull(plex2, p[1], plex2) @@ -73,11 +84,11 @@ describe('channel', () => { }) }) - it('stream can be piped with itself (echo)', (done) => { + it('stream can be piped to itself (echo)', (done) => { const p = pair() - const plex1 = new Mplex(true) - const plex2 = new Mplex(false) + const plex1 = new Plex(true) + const plex2 = new Plex(false) pull(plex1, p[0], plex1) pull(plex2, p[1], plex2) @@ -102,46 +113,11 @@ describe('channel', () => { ) }) - it('sending close msg closes stream', (done) => { - const plex = new Mplex(true) - - plex.on('stream', (stream) => { - pull( - stream, - pull.collect((err, data) => { - expect(err).to.not.exist() - expect(data[0]).to.deep.eql(Buffer.from('hellooooooooooooo')) - done() - }) - ) - }) - - series([ - (cb) => utils.encodeMsg(3, - consts.type.NEW, - Buffer.from('chan1'), cb), - (cb) => utils.encodeMsg(3, - consts.type.IN_MESSAGE, - Buffer.from('hellooooooooooooo'), - cb), - (cb) => utils.encodeMsg(3, - consts.type.IN_CLOSE, - Buffer.from([]), - cb) - ], (err, msgs) => { - expect(err).to.not.exist() - pull( - pull.values(msgs), - plex - ) - }) - }) - - it('closing sender closes stream for writting, but allows reading from it', (done) => { + it('closing sender closes stream for writting, but allows reading', (done) => { const p = pair() - const plex1 = new Mplex(true) - const plex2 = new Mplex(false) + const plex1 = new Plex(true) + const plex2 = new Plex(false) pull(plex1, p[0], plex1) pull(plex2, p[1], plex2) @@ -150,7 +126,6 @@ describe('channel', () => { const rcvrSrc = pushable() plex2.on('stream', (receiver) => { - pull( rcvrSrc, receiver @@ -161,26 +136,24 @@ describe('channel', () => { }) const sender = plex1.createStream() - sender.openChan(() => { - sndrSrc.end() - - pull( - sndrSrc, - sender, - pull.collect((err, data) => { - expect(err).to.not.exist() - expect(data[0].toString()).to.be.eql('Here ya go!') - done() - }) - ) - }) + sender.openChan() + sndrSrc.end() + pull( + sndrSrc, + sender, + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0].toString()).to.be.eql('Here ya go!') + done() + }) + ) }) - it('closing receiver closes stream for writting, but allows reading from it', (done) => { + it('closing receiver closes stream for writting, but allows reading', (done) => { const p = pair() - const plex1 = new Mplex(true) - const plex2 = new Mplex(false) + const plex1 = new Plex(true) + const plex2 = new Plex(false) pull(plex1, p[0], plex1) pull(plex2, p[1], plex2) @@ -203,22 +176,21 @@ describe('channel', () => { }) const sender = plex1.createStream() - sender.openChan(() => { - pull( - sndrSrc, - sender - ) + sender.openChan() + pull( + sndrSrc, + sender + ) - sndrSrc.push('Here ya go!') // should be able to write to closed chan - sndrSrc.end() - }) + sndrSrc.push('Here ya go!') // should be able to write to closed chan + sndrSrc.end() }) it('closed sender should allow receiver to flush data', (done) => { const p = pair() - const plex1 = new Mplex(true) - const plex2 = new Mplex(false) + const plex1 = new Plex(true) + const plex2 = new Plex(false) pull(plex1, p[0], plex1) pull(plex2, p[1], plex2) @@ -227,7 +199,6 @@ describe('channel', () => { const rcvrSrc = pushable() plex2.on('stream', (receiver) => { - pull( rcvrSrc, receiver, @@ -240,23 +211,21 @@ describe('channel', () => { }) const sender = plex1.createStream() - sender.openChan((err) => { - expect(err).to.not.exist() - sndrSrc.push('hello from sender!') - sndrSrc.end() + sender.openChan() + sndrSrc.push('hello from sender!') + sndrSrc.end() - pull( - sndrSrc, - sender - ) - }) + pull( + sndrSrc, + sender + ) }) it('should reset channels', (done) => { const p = pair() - const plex1 = new Mplex(true) - const plex2 = new Mplex(false) + const plex1 = new Plex(true) + const plex2 = new Plex(false) pull(plex1, p[0], plex1) pull(plex2, p[1], plex2) @@ -277,13 +246,42 @@ describe('channel', () => { const sndrSrc = pushable() const sender = plex1.createStream() const aborter = abortable() - sender.openChan((err) => { - expect(err).to.not.exist() - pull( - sndrSrc, - aborter, - sender - ) + sender.openChan() + pull( + sndrSrc, + aborter, + sender + ) + }) + + it('open a stream on both sides', (done) => { + const p = pair() + + const dialer = new Plex(true) + const listener = new Plex(false) + + pull(dialer, p[0], dialer) + pull(listener, p[1], listener) + + expect(6).check(done) + + dialer.on('stream', (stream) => { + expect(stream).to.exist.mark() + closeAndWait(stream) }) + + const listenerConn = listener.createStream('listener') + listenerConn.openChan() + + listener.on('stream', (stream) => { + expect(stream).to.exist.mark() + closeAndWait(stream) + }) + + const dialerConn = dialer.createStream('dialer') + dialerConn.openChan() + + closeAndWait(dialerConn) + closeAndWait(listenerConn) }) }) diff --git a/test/mplex.spec.js b/test/mplex.spec.js deleted file mode 100644 index 88674fb..0000000 --- a/test/mplex.spec.js +++ /dev/null @@ -1,82 +0,0 @@ -/* eslint-env mocha */ -/* eslint max-nested-callbacks: ["error", 5] */ -'use strict' - -const chai = require('chai') -const dirtyChai = require('dirty-chai') -const expect = chai.expect -chai.use(dirtyChai) - -const pull = require('pull-stream') -const pair = require('pull-pair/duplex') -const pushable = require('pull-pushable') -const abortable = require('pull-abortable') - -const Mplex = require('../src') -const utils = require('../src/utils') -const consts = require('../src/consts') - -const series = require('async/series') - -describe('channel', () => { - it('should be writable', (done) => { - const plex = new Mplex(false) - - plex.on('stream', (stream) => { - pull(pull.values([Buffer.from('hellooooooooooooo')]), stream) - }) - - utils.encodeMsg(3, - consts.type.NEW, - Buffer.from('chan1'), - (err, msg) => { - expect(err).to.not.exist() - pull( - pull.values([msg]), - plex, - pull.drain((_data) => { - expect(err).to.not.exist() - utils.decodeMsg(_data, (err, data) => { - expect(err).to.not.exist() - const { id, type } = data[0] - expect(id).to.eql(3) - expect(type).to.eql(consts.type.IN_MESSAGE) - expect(data[1]).to.deep.eql(Buffer.from('hellooooooooooooo')) - done() - }) - }) - ) - }) - }) - - it('should be readable', (done) => { - const plex = new Mplex(true) - - plex.on('stream', (stream) => { - pull( - stream, - // drain, because otherwise we have to send an explicit close - pull.drain((data) => { - expect(data).to.deep.eql(Buffer.from('hellooooooooooooo')) - done() - }) - ) - }) - - series([ - (cb) => utils.encodeMsg(3, - consts.type.NEW, - Buffer.from('chan1'), cb), - (cb) => utils.encodeMsg(3, - consts.type.IN_MESSAGE, - Buffer.from('hellooooooooooooo'), - cb) - ], (err, msgs) => { - expect(err).to.not.exist() - pull( - pull.values(msgs), - plex - ) - }) - }) -}) diff --git a/test/plex.spec.js b/test/plex.spec.js new file mode 100644 index 0000000..2ed4ebe --- /dev/null +++ b/test/plex.spec.js @@ -0,0 +1,92 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const pull = require('pull-stream') +const pair = require('pull-pair/duplex') +const pushable = require('pull-pushable') +const abortable = require('pull-abortable') + +const Plex = require('../src') +const utils = require('../src/utils') +const consts = require('../src/consts') + +const series = require('async/series') + +describe('plex', () => { + // it('should be writable', (done) => { + // const plex = new Plex(false) + // + // plex.on('stream', (stream) => { + // pull(pull.values([Buffer.from('hellooooooooooooo')]), stream) + // }) + // + // utils.encodeMsg(3, + // consts.type.NEW, + // Buffer.from('chan1'), + // (err, msg) => { + // expect(err).to.not.exist() + // pull( + // pull.values([msg]), + // plex, + // pull.drain((_data) => { + // expect(err).to.not.exist() + // utils.decodeMsg(_data, (err, data) => { + // expect(err).to.not.exist() + // const { id, type } = data[0] + // expect(id).to.eql(3) + // expect(type).to.eql(consts.type.IN_MESSAGE) + // expect(data[1]).to.deep.eql(Buffer.from('hellooooooooooooo')) + // done() + // }) + // }) + // ) + // }) + // }) + // + // it('should be readable', (done) => { + // const plex = new Plex(true) + // + // plex.on('stream', (stream) => { + // pull( + // stream, + // // drain, because otherwise we have to send an explicit close + // pull.drain((data) => { + // expect(data).to.deep.eql(Buffer.from('hellooooooooooooo')) + // done() + // }) + // ) + // }) + // + // series([ + // (cb) => utils.encodeMsg(3, + // consts.type.NEW, + // Buffer.from('chan1'), cb), + // (cb) => utils.encodeMsg(3, + // consts.type.IN_MESSAGE, + // Buffer.from('hellooooooooooooo'), + // cb) + // ], (err, msgs) => { + // expect(err).to.not.exist() + // pull( + // pull.values(msgs), + // plex + // ) + // }) + // }) + + it(`channel id should be correct`, () => [1, 0].forEach((type) => { + const initiator = Boolean(type) + const plex = new Plex(initiator) + + const times = 10 + for (let i = 0; i < times; i++) { + expect(Boolean(plex._nextChanId() & 1)).to.be.eql(initiator) + } + })) +}) diff --git a/test/utils.spec.js b/test/utils.spec.js new file mode 100644 index 0000000..fc873f8 --- /dev/null +++ b/test/utils.spec.js @@ -0,0 +1,37 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 5] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(require('chai-checkmark')) +chai.use(dirtyChai) + +const pull = require('pull-stream') + +const utils = require('../src/utils') + +describe('utils', () => { + it('encodes header', () => { + pull( + pull.values([[17, 0, Buffer.from('17')]]), + utils.encode(), + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0]).to.be.eql(Buffer.from('8801023137', 'hex')) + }) + ) + }) + + it('decodes header', () => { + pull( + pull.values([Buffer.from('8801023137', 'hex')]), + utils.decode(), + pull.collect((err, data) => { + expect(err).to.not.exist() + expect(data[0]).to.be.eql({ id: 17, type: 0, data: Buffer.from('17') }) + }) + ) + }) +}) \ No newline at end of file