Skip to content

Commit

Permalink
feat: use bl to avoid buffer concat/copy
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 90aac16 commit 018be66
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 11 deletions.
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@
"url": "git+https://github.com/dryajov/pull-plex.git"
},
"dependencies": {
"bl": "^1.2.2",
"buffer-reuse-pool": "^1.0.0",
"chai": "^4.1.2",
"debug": "^3.1.0",
"dirty-chai": "^2.0.1",
"interface-connection": "^0.3.2",
"lodash.defaults": "^4.2.0",
"pull-batch": "^1.0.0",
"pull-cat": "^1.1.11",
"pull-defer": "^0.2.2",
Expand Down
26 changes: 15 additions & 11 deletions src/coder.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
const pull = require('pull-stream')
const varint = require('varint')
const through = require('pull-through')
const reuse = require('buffer-reuse-pool')
const BufferList = require('bl')

const pool = reuse.pool(1024 * 1024)

const debug = require('debug')

const log = debug('pull-plex:utils')
log.err = debug('pull-plex:utils:err')
const log = debug('pull-plex:coder')
log.err = debug('pull-plex:coder:err')

exports.encode = () => {
return pull(
Expand Down Expand Up @@ -35,20 +39,20 @@ exports.decode = () => {
let message = null
let length = 0
let buffer = null
let pos = 0


const decode = (msg) => {
try {
let offset = 0
let length = 0
const h = varint.decode(msg)
let buff = msg.slice()
const h = varint.decode(buff) // no bl[x] accessor :(
offset += varint.decode.bytes
length = varint.decode(msg, offset)
length = varint.decode(buff, offset)
offset += varint.decode.bytes
const message = {
id: h >> 3,
type: h & 7,
data: Buffer.alloc(length) // instead of allocating a new buff use a mem pool here
data: new BufferList() // instead of allocating a new buff use a mem pool here
}

state = States.READING
Expand All @@ -69,7 +73,7 @@ exports.decode = () => {
if (left < 0) { left = 0 }
if (msg.length > 0) {
const buff = msg.slice(0, length - left)
pos += buff.copy(data, pos)
data.append(buff)
msg = msg.slice(buff.length)
}
if (left <= 0) { state = States.PARSING }
Expand All @@ -80,9 +84,9 @@ exports.decode = () => {
while (msg.length) {
if (States.PARSING === state) {
if (!buffer) {
buffer = Buffer.from(msg)
buffer = new BufferList(msg)
} else {
buffer = Buffer.concat([buffer, msg])
buffer = buffer.append(msg)
}

[msg, message, length] = decode(buffer)
Expand All @@ -95,11 +99,11 @@ exports.decode = () => {
if (States.READING === state) {
[length, msg, message.data] = read(msg, message.data, length)
if (length <= 0 && States.PARSING === state) {
message.data = message.data.slice() // get new buffer
this.queue(message)
offset = 0
message = null
length = 0
pos = 0
}
}
}
Expand Down

0 comments on commit 018be66

Please sign in to comment.