diff --git a/package.json b/package.json index 5f4009c..dcf4715 100644 --- a/package.json +++ b/package.json @@ -35,10 +35,13 @@ "url": "git+https://github.com/dryajov/pull-plex.git" }, "dependencies": { + "bl": "^1.2.2", + "buffer-reuse-pool": "^1.0.0", "chai": "^4.1.2", "debug": "^3.1.0", "dirty-chai": "^2.0.1", "interface-connection": "^0.3.2", + "lodash.defaults": "^4.2.0", "pull-batch": "^1.0.0", "pull-cat": "^1.1.11", "pull-defer": "^0.2.2", diff --git a/src/coder.js b/src/coder.js index 0a3c8e0..7af38e4 100644 --- a/src/coder.js +++ b/src/coder.js @@ -3,11 +3,15 @@ const pull = require('pull-stream') const varint = require('varint') const through = require('pull-through') +const reuse = require('buffer-reuse-pool') +const BufferList = require('bl') + +const pool = reuse.pool(1024 * 1024) const debug = require('debug') -const log = debug('pull-plex:utils') -log.err = debug('pull-plex:utils:err') +const log = debug('pull-plex:coder') +log.err = debug('pull-plex:coder:err') exports.encode = () => { return pull( @@ -35,20 +39,20 @@ exports.decode = () => { let message = null let length = 0 let buffer = null - let pos = 0 - + const decode = (msg) => { try { let offset = 0 let length = 0 - const h = varint.decode(msg) + let buff = msg.slice() + const h = varint.decode(buff) // no bl[x] accessor :( offset += varint.decode.bytes - length = varint.decode(msg, offset) + length = varint.decode(buff, offset) offset += varint.decode.bytes const message = { id: h >> 3, type: h & 7, - data: Buffer.alloc(length) // instead of allocating a new buff use a mem pool here + data: new BufferList() // instead of allocating a new buff use a mem pool here } state = States.READING @@ -69,7 +73,7 @@ exports.decode = () => { if (left < 0) { left = 0 } if (msg.length > 0) { const buff = msg.slice(0, length - left) - pos += buff.copy(data, pos) + data.append(buff) msg = msg.slice(buff.length) } if (left <= 0) { state = States.PARSING } @@ -80,9 +84,9 @@ exports.decode = () => { while (msg.length) { if (States.PARSING === state) { if (!buffer) { - buffer = Buffer.from(msg) + buffer = new BufferList(msg) } else { - buffer = Buffer.concat([buffer, msg]) + buffer = buffer.append(msg) } [msg, message, length] = decode(buffer) @@ -95,11 +99,11 @@ exports.decode = () => { if (States.READING === state) { [length, msg, message.data] = read(msg, message.data, length) if (length <= 0 && States.PARSING === state) { + message.data = message.data.slice() // get new buffer this.queue(message) offset = 0 message = null length = 0 - pos = 0 } } }