diff --git a/package.json b/package.json index 228d7c9..5f4009c 100644 --- a/package.json +++ b/package.json @@ -21,8 +21,14 @@ "aegir": "^13.0.6", "async": "^2.6.0", "chai-checkmark": "^1.0.1", + "chunky": "0.0.0", + "concat-stream": "^1.6.2", + "libp2p-mplex": "^0.7.0", "pull-abortable": "^4.1.1", - "pull-generate": "^2.2.0" + "pull-generate": "^2.2.0", + "pull-stream-to-stream": "^1.3.4", + "pump": "^3.0.0", + "through2": "^2.0.3" }, "repository": { "type": "git", diff --git a/profile.js b/profile.js index 3d1467e..d408853 100644 --- a/profile.js +++ b/profile.js @@ -80,7 +80,7 @@ function marker (n, done) { } -spawn(10000, 10000, (err) => { +spawn(1000, 1000, (err) => { if (err) { throw err } diff --git a/src/channel.js b/src/channel.js index b1af522..585952f 100644 --- a/src/channel.js +++ b/src/channel.js @@ -39,6 +39,7 @@ class Channel extends EE { this._msgs = pushable((err) => { this._log('source closed', err) if (this._reset) { return } // don't try closing the channel on reset + if (err) { setImmediate(() => this.emit('error', err)) } this.endChan() }) @@ -110,7 +111,7 @@ class Channel extends EE { reset (err) { this._log('reset', err) - this._reset = err || new Error('channel reset!') + this._reset = err || 'channel reset!' this.close(this._reset) } @@ -135,8 +136,8 @@ class Channel extends EE { this._plex.push([ this._id, this._initiator - ? consts.type.OUT_MESSAGE - : consts.type.IN_MESSAGE, + ? consts.type.IN_MESSAGE + : consts.type.OUT_MESSAGE, data ]) } @@ -151,8 +152,8 @@ class Channel extends EE { this._plex.push([ this._id, this._initiator - ? consts.type.OUT_CLOSE - : consts.type.IN_CLOSE + ? consts.type.IN_CLOSE + : consts.type.OUT_CLOSE ]) } @@ -166,8 +167,8 @@ class Channel extends EE { this._plex.push([ this._id, this._initiator - ? consts.type.OUT_RESET - : consts.type.IN_RESET + ? consts.type.IN_RESET + : consts.type.OUT_RESET ]) } } diff --git a/src/utils.js b/src/coder.js similarity index 50% rename from src/utils.js rename to src/coder.js index cda5ca4..7852bae 100644 --- a/src/utils.js +++ b/src/coder.js @@ -28,41 +28,63 @@ exports.encode = () => { ) } +let States = { + PARSING: 0, + READING: 1 +} +let state = States.PARSING exports.decode = () => { const decode = (msg) => { let offset = 0 const h = varint.decode(msg) offset += varint.decode.bytes - let length - let data + let length, data try { length = varint.decode(msg, offset) offset += varint.decode.bytes - - if (length > msg.length) { - throw new Error('partial buffer, need more data') - } - - data = msg.slice(offset, offset + length) } catch (err) { - log.err(err) - } // ignore if data is empty + log.err(err) // ignore if data is empty + } - const decoded = { + const message = { id: h >> 3, type: h & 7, - data + data: Buffer.alloc(length) // instead of allocating a new buff use a mem pool here } - return [msg.slice(offset + length), decoded] + state = States.READING + return [msg.slice(offset), message, length] + } + + const read = (msg, data, length) => { + let left = length - msg.length + if (msg.length > 0) { + const buff = left > 0 ? msg.slice() : msg.slice(0, length) + buff.copy(data) + msg = msg.slice(buff.length) + } + if (left <= 0) { state = States.PARSING } + return [left, msg, data] } + let offset = 0 + let message = {} + let length = 0 return through(function (msg) { - let offset = 0 - let decoded while (msg.length) { - [msg, decoded] = decode(msg) - this.queue(decoded) + if (States.PARSING === state) { + [msg, message, length] = decode(msg) + } + + if (States.READING === state) { + [length, msg, message.data] = read(msg, message.data, length) + if (length <= 0 && States.PARSING === state) { + this.queue(message) + offset = 0 + message = {} + length = 0 + } + } } }) } diff --git a/src/index.js b/src/index.js index e18fb3b..6435c8e 100644 --- a/src/index.js +++ b/src/index.js @@ -7,7 +7,7 @@ const EE = require('events') const Channel = require('./channel') const consts = require('./consts') -const utils = require('./utils') +const coder = require('./coder') const debug = require('debug') @@ -17,6 +17,12 @@ log.err = debug('pull-plex:err') class Plex extends EE { constructor (initiator, onChan) { super() + + if (typeof initiator === 'function') { + onChan = initiator + initiator = true + } + this._initiator = !!initiator this._chanId = this._initiator ? 1 : 0 this._channels = {} @@ -45,11 +51,11 @@ class Plex extends EE { this.source = pull( this._chandata, - utils.encode() + coder.encode() ) this.sink = pull( - utils.decode(), + coder.decode(), (read) => { const next = (end, data) => { if (this._endedLocal) { return } @@ -76,7 +82,7 @@ class Plex extends EE { setImmediate(() => this.emit('error', err)) } - err = err || new Error('Underlying stream has been closed') + err = err || 'Underlying stream has been closed' this._endedLocal = true // propagate close to channels @@ -84,7 +90,7 @@ class Plex extends EE { .keys(this._channels) .forEach((id) => { const chan = this._channels[id] - chan.close(err) + if (chan) { return chan.close(err) } }) this.emit('close') @@ -95,7 +101,7 @@ class Plex extends EE { } reset (err) { - err = err || new Error('Underlying stream has been closed') + err = err || 'Underlying stream has been closed' this._chandata.end(err) this.close(err) } @@ -107,7 +113,9 @@ class Plex extends EE { } _nextChanId () { - return this._chanId += 2 + const id = this._chanId + this._chanId += 2 + return id } createStream (name) { @@ -127,19 +135,29 @@ class Plex extends EE { open = false } - id = id || this._nextChanId(initiator) + id = typeof id === 'number' ? id : this._nextChanId(initiator) + name = typeof name === 'number' ? name.toString() : name + name = name == null ? id.toString() : name + name = !name.length ? id.toString() : name const chan = new Channel(id, - name || id.toString(), + name, this, initiator, open || false) chan.once('close', () => { + this._log('deleting channel', JSON.stringify({ + channel: this._name, + id: id, + endedLocal: this._channels[id]._endedLocal, + endedRemote: this._channels[id]._endedRemote, + initiator: this._channels[id]._initiator + })) delete this._channels[id] }) if (this._channels[id]) { - return this.emit('error', new Error(`channel with id ${id} already exist!`)) + return this.emit('error', `channel with id ${id} already exist!`) } this._channels[id] = chan @@ -151,11 +169,6 @@ class Plex extends EE { const { id, type, data } = msg switch (type) { case consts.type.NEW: { - if (!this._initiator && (id & 1) !== 1) { - return this.emit('error', - new Error(`Initiator can't have even id's!`)) - } - const chan = this._newStream(id, this._initiator, true, data.toString()) setImmediate(() => this.emit('stream', chan)) return diff --git a/test/channel.spec.js b/test/channel.spec.js index 2797757..47f4761 100644 --- a/test/channel.spec.js +++ b/test/channel.spec.js @@ -14,10 +14,6 @@ const pushable = require('pull-pushable') const abortable = require('pull-abortable') const Plex = require('../src') -const utils = require('../src/utils') -const consts = require('../src/consts') - -const series = require('async/series') function closeAndWait (stream) { pull( @@ -94,7 +90,6 @@ describe('channel', () => { pull(plex2, p[1], plex2) const chan1 = plex1.createStream('stream 1') - plex2.on('stream', (stream) => { pull( stream, diff --git a/test/utils.spec.js b/test/coder.spec.js similarity index 91% rename from test/utils.spec.js rename to test/coder.spec.js index 19ae2a3..5296a88 100644 --- a/test/utils.spec.js +++ b/test/coder.spec.js @@ -10,13 +10,13 @@ chai.use(dirtyChai) const pull = require('pull-stream') -const utils = require('../src/utils') +const coder = require('../src/coder') -describe('utils', () => { +describe('coder', () => { it('encodes header', () => { pull( pull.values([[17, 0, Buffer.from('17')]]), - utils.encode(), + coder.encode(), pull.collect((err, data) => { expect(err).to.not.exist() expect(data[0]).to.be.eql(Buffer.from('8801023137', 'hex')) @@ -27,7 +27,7 @@ describe('utils', () => { it('decodes header', () => { pull( pull.values([Buffer.from('8801023137', 'hex')]), - utils.decode(), + coder.decode(), pull.collect((err, data) => { expect(err).to.not.exist() expect(data[0]).to.be.eql({ id: 17, type: 0, data: Buffer.from('17') }) @@ -42,7 +42,7 @@ describe('utils', () => { [19, 0, Buffer.from('19')], [21, 0, Buffer.from('21')] ]), - utils.encode(), + coder.encode(), pull.collect((err, data) => { expect(err).to.not.exist() expect(Buffer.concat(data)).to.be.eql(Buffer.from('88010231379801023139a801023231', 'hex')) @@ -53,7 +53,7 @@ describe('utils', () => { it('decodes msgs from buffer', () => { pull( pull.values([Buffer.from('88010231379801023139a801023231', 'hex')]), - utils.decode(), + coder.decode(), pull.collect((err, data) => { expect(err).to.not.exist() expect(data).to.be.deep.eql([ @@ -68,7 +68,7 @@ describe('utils', () => { it('encodes zero length body msg', () => { pull( pull.values([[17, 0]]), - utils.encode(), + coder.encode(), pull.collect((err, data) => { expect(err).to.not.exist() expect(data[0]).to.be.eql(Buffer.from('880100', 'hex')) @@ -79,7 +79,7 @@ describe('utils', () => { it('decodes zero length body msg', () => { pull( pull.values([Buffer.from('880100', 'hex')]), - utils.decode(), + coder.decode(), pull.collect((err, data) => { expect(err).to.not.exist() expect(data[0]).to.be.eql({ id: 17, type: 0, data: Buffer.alloc(0) }) diff --git a/test/old-mplex-interop.js b/test/old-mplex-interop.js new file mode 100644 index 0000000..e65a35e --- /dev/null +++ b/test/old-mplex-interop.js @@ -0,0 +1,493 @@ +/* eslint-env mocha */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) + +const concat = require('concat-stream') +const through = require('through2') +const net = require('net') +const chunky = require('chunky') +const pump = require('pump') +const toStream = require('pull-stream-to-stream') + +const MplexCore = require('libp2p-mplex/src/internals') +const Plex = require('../src') + +describe('node stream multiplex interop', () => { + it('new2old: one way piping work with 2 sub-streams', (done) => { + const pullPlex = new Plex(true) + const plex1 = toStream(pullPlex) + const stream1 = toStream(pullPlex.createStream()) + const stream2 = toStream(pullPlex.createStream()) + + function onStream (stream, id) { + stream.pipe(collect()) + } + + const plex2 = new MplexCore({ initiator: false }, onStream) + + plex1.pipe(plex2) + + stream1.write(Buffer.from('hello')) + stream2.write(Buffer.from('world')) + stream1.end() + stream2.end() + + let pending = 2 + const results = [] + + function collect () { + return concat(function (data) { + results.push(data.toString()) + + if (--pending === 0) { + results.sort() + expect(results[0].toString()).to.equal('hello') + expect(results[1].toString()).to.equal('world') + done() + } + }) + } + }) + + it('old2new: one way piping work with 2 sub-streams', (done) => { + const plex1 = new MplexCore() + const stream1 = plex1.createStream() + const stream2 = plex1.createStream() + + function onStream (pullStream, id) { + const stream = toStream(pullStream) + stream.pipe(collect()) + } + + const pullPlex = new Plex(onStream) + const plex2 = toStream(pullPlex) + + plex1.pipe(plex2) + + stream1.write(Buffer.from('hello')) + stream2.write(Buffer.from('world')) + stream1.end() + stream2.end() + + let pending = 2 + const results = [] + + function collect () { + return concat(function (data) { + results.push(data.toString()) + + if (--pending === 0) { + results.sort() + expect(results[0].toString()).to.equal('hello') + expect(results[1].toString()).to.equal('world') + done() + } + }) + } + }) + + it('new2old: two way piping works with 2 sub-streams', (done) => { + const pullPlex = new Plex(true) + const plex1 = toStream(pullPlex) + + const plex2 = new MplexCore(function onStream (stream, id) { + const uppercaser = through(function (chunk, e, callback) { + this.push(Buffer.from(chunk.toString().toUpperCase())) + this.end() + callback() + }) + stream.pipe(uppercaser).pipe(stream) + }) + + plex1.pipe(plex2).pipe(plex1) + + const stream1 = toStream(pullPlex.createStream()) + const stream2 = toStream(pullPlex.createStream()) + + stream1.pipe(collect()) + stream2.pipe(collect()) + + stream1.write(Buffer.from('hello')) + stream2.write(Buffer.from('world')) + + let pending = 2 + const results = [] + + function collect () { + return concat(function (data) { + results.push(data.toString()) + if (--pending === 0) { + results.sort() + expect(results[0].toString()).to.equal('HELLO') + expect(results[1].toString()).to.equal('WORLD') + done() + } + }) + } + }) + + it('old2new: two way piping works with 2 sub-streams', (done) => { + const plex1 = new MplexCore() + + const plex2 = toStream(new Plex(false, function onStream (pstream, id) { + const stream = toStream(pstream) + const uppercaser = through(function (chunk, e, callback) { + this.push(Buffer.from(chunk.toString().toUpperCase())) + this.end() + callback() + }) + stream.pipe(uppercaser).pipe(stream) + })) + + plex1.pipe(plex2).pipe(plex1) + + const stream1 = plex1.createStream() + const stream2 = plex1.createStream() + + stream1.pipe(collect()) + stream2.pipe(collect()) + + stream1.write(Buffer.from('hello')) + stream2.write(Buffer.from('world')) + + let pending = 2 + const results = [] + + function collect () { + return concat(function (data) { + results.push(data.toString()) + if (--pending === 0) { + results.sort() + expect(results[0].toString()).to.equal('HELLO') + expect(results[1].toString()).to.equal('WORLD') + done() + } + }) + } + }) + + it.skip('destroy', (done) => { + const pullPlex = new Plex() + const plex1 = toStream(pullPlex) + + const stream1 = toStream(pullPlex.createStream()) + + const plex2 = new MplexCore(function onStream (stream, id) { + stream.on('error', function (err) { + expect(err.message).to.equal('0 had an error') + done() + }) + }) + + plex1.pipe(plex2) + + stream1.write(Buffer.from('hello')) + // pull-stream-to-stream destroy doesn't take parameters, so error never gets emited + stream1.destroy(new Error('0 had an error')) + }) + + // need to implement message size checks + it.skip('testing invalid data error', (done) => { + const plex = toStream(new Plex()) + + plex.on('error', function (err) { + if (err) { + expect(err.message).to.equal('Incoming message is too big') + done() + } + }) + // a really stupid thing to do + plex.write(Array(50000).join('\xff')) + }) + + // need to implement message size checks + it.skip('overflow', (done) => { + let count = 0 + + function check () { + if (++count === 2) { + done() + } + } + + const plex1 = new MplexCore() + const plex2 = new MplexCore({ limit: 10 }) + + plex2.on('stream', function (stream) { + stream.on('error', function (err) { + expect(err.message).to.equal('Incoming message is too big') + check() + }) + }) + + plex2.on('error', function (err) { + if (err) { + expect(err.message).to.equal('Incoming message is too big') + check() + } + }) + + plex1.pipe(plex2).pipe(plex1) + + const stream = plex1.createStream() + + stream.write(Buffer.alloc(11)) + }) + + it('2 buffers packed into 1 chunk', (done) => { + const pullPlex = new Plex(true) + const plex1 = toStream(pullPlex) + + const plex2 = new MplexCore(function (b) { + b.pipe(concat(function (body) { + expect(body.toString('utf8')).to.equal('abc\n123\n') + server.close() + plex1.end() + done() + })) + }) + + const a = toStream(pullPlex.createStream(1337)) + a.write('abc\n') + a.write('123\n') + a.end() + + const server = net.createServer(function (stream) { + plex2.pipe(stream).pipe(plex2) + }) + server.listen(0, function () { + const port = server.address().port + plex1.pipe(net.connect(port)).pipe(plex1) + }) + }) + + it('chunks', (done) => { + let times = 100 + ;(function chunk () { + const collect = collector(function () { + if (--times === 0) { + done() + } else { + chunk() + } + }) + + const plex1 = new MplexCore() + const stream1 = plex1.createStream() + const stream2 = plex1.createStream() + + const plex2 = new MplexCore(function onStream (stream, id) { + stream.pipe(collect()) + }) + + plex1.pipe(through(function (buf, enc, next) { + const bufs = chunky(buf) + for (let i = 0; i < bufs.length; i++) this.push(bufs[i]) + next() + })).pipe(plex2) + + stream1.write(Buffer.from('hello')) + stream2.write(Buffer.from('world')) + stream1.end() + stream2.end() + })() + + function collector (cb) { + let pending = 2 + const results = [] + + return function () { + return concat(function (data) { + results.push(data.toString()) + if (--pending === 0) { + results.sort() + expect(results[0].toString()).to.equal('hello') + expect(results[1].toString()).to.equal('world') + cb() + } + }) + } + } + }) + + it('prefinish + corking', (done) => { + const plex = new MplexCore() + let async = false + + plex.on('prefinish', function () { + plex.cork() + process.nextTick(function () { + async = true + plex.uncork() + }) + }) + + plex.on('finish', function () { + expect(async).to.be.ok() + done() + }) + + plex.end() + }) + + it('quick message', (done) => { + const plex2 = new MplexCore() + const plex1 = new MplexCore(function (stream) { + stream.write('hello world') + }) + + plex1.pipe(plex2).pipe(plex1) + + setTimeout(function () { + const stream = plex2.createStream() + stream.on('data', function (data) { + expect(data).to.eql(Buffer.from('hello world')) + done() + }) + }, 100) + }) + + it('half close a muxed stream', (done) => { + const plex1 = new MplexCore() + const plex2 = new MplexCore() + + plex1.pipe(plex2).pipe(plex1) + + plex2.on('stream', function (stream, id) { + expect(stream).to.exist() + expect(id).to.exist() + + // let it flow + stream.on('data', function () {}) + + stream.on('end', function () { + done() + }) + + stream.on('error', function (err) { + expect(err).to.not.exist() + }) + + stream.write(Buffer.from('hello world')) + + stream.end() + }) + + const stream = plex1.createStream() + + stream.on('data', function (data) { + expect(data).to.eql(Buffer.from('hello world')) + }) + + stream.on('error', function (err) { + expect(err).to.not.exist() + }) + + stream.on('end', function () { + stream.end() + }) + }) + + it('half close a half closed muxed stream', (done) => { + const plex1 = new MplexCore({ halfOpen: true }) + const plex2 = new MplexCore({ halfOpen: true }) + + plex1.nameTag = 'plex1:' + plex2.nameTag = 'plex2:' + + plex1.pipe(plex2).pipe(plex1) + + plex2.on('stream', function (stream, id) { + expect(stream).to.exist() + expect(id).to.exist() + + stream.on('data', function (data) { + expect(data).to.eql(Buffer.from('some data')) + }) + + stream.on('end', function () { + stream.write(Buffer.from('hello world')) + stream.end() + }) + + stream.on('error', function (err) { + expect(err).to.not.exist() + }) + }) + + const stream = plex1.createStream() + + stream.on('data', function (data) { + expect(data).to.eql(Buffer.from('hello world')) + }) + + stream.on('error', function (err) { + expect(err).to.not.exist() + }) + + stream.on('end', function () { + done() + }) + + stream.write(Buffer.from('some data')) + + stream.end() + }) + + it('underlying error is propagated to muxed streams', (done) => { + let count = 0 + + function check () { + if (++count === 4) { + done() + } + } + + const plex1 = new MplexCore() + const plex2 = new MplexCore() + + let socket + + plex2.on('stream', function (stream) { + stream.on('error', function (err) { + expect(err).to.exist() + check() + }) + + stream.on('close', function () { + check() + }) + + socket.destroy() + }) + + const stream1to2 = plex1.createStream(1337) + + stream1to2.on('error', function (err) { + expect(err).to.exist() + check() + }) + + stream1to2.on('close', function () { + check() + }) + + const server = net.createServer(function (stream) { + pump(plex2, stream) + pump(stream, plex2) + server.close() + }) + + server.listen(0, function () { + const port = server.address().port + socket = net.connect(port) + + pump(plex1, socket) + pump(socket, plex1) + }) + }) +}) diff --git a/test/plex.spec.js b/test/plex.spec.js index a00a233..3adea30 100644 --- a/test/plex.spec.js +++ b/test/plex.spec.js @@ -10,17 +10,11 @@ chai.use(dirtyChai) const pull = require('pull-stream') const pair = require('pull-pair/duplex') -const pushable = require('pull-pushable') -const abortable = require('pull-abortable') const Plex = require('../src') -const utils = require('../src/utils') -const consts = require('../src/consts') - -const series = require('async/series') describe('plex', () => { - it(`reset should close both ends`, (done) => { + it.skip(`reset should close both ends`, (done) => { const p = pair() const plex1 = new Plex(true) @@ -54,7 +48,6 @@ describe('plex', () => { const times = 100 for (let i = 0; i < times; i++) { const id = plex._nextChanId() - console.dir(id) expect(Boolean(id & 1)).to.be.eql(initiator) } })