Skip to content

Commit

Permalink
use parser.execute
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Feb 26, 2021
1 parent 0e4ca4d commit 3fecee9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
13 changes: 7 additions & 6 deletions lib/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,14 @@ class RequestHandler extends AsyncResource {
onComplete (trailers) {
const { res } = this

removeSignal(this)

if (trailers) {
util.parseHeaders(trailers, this.trailers)
}
util.queueMicrotask(() => {
removeSignal(this)

res.push(null)
if (trailers) {
util.parseHeaders(trailers, this.trailers)
}
res.push(null)
})
}

onError (err) {
Expand Down
35 changes: 20 additions & 15 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@ class Parser extends HTTPParser {

socketPause(this.socket)
}

socket.on('data', onSocketData)
}

[HTTPParser.kOnHeaders] (rawHeaders) {
Expand All @@ -439,7 +441,7 @@ class Parser extends HTTPParser {
}
}

[HTTPParser.kOnExecute] (ret) {
[HTTPParser.kOnExecute] (ret, currentBuffer) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnExecute], ret])
return
Expand Down Expand Up @@ -480,17 +482,18 @@ class Parser extends HTTPParser {
// Reset socket state to non flowing:
socket._readableState.flowing = null

socket.unshift(this.getCurrentBuffer().slice(ret))
socket.unshift(currentBuffer.slice(ret))

try {
request.onUpgrade(statusCode, headers, socket)

if (!socket.destroyed && !request.aborted) {
detachSocket(socket)

client[kSocket] = null

client[kQueue][client[kRunningIdx]++] = null
client.emit('disconnect', client, new InformationalError('upgrade'))

request.onUpgrade(statusCode, headers, socket)
}

resume(client)
Expand Down Expand Up @@ -551,7 +554,6 @@ class Parser extends HTTPParser {
this.request = request

if (request.upgrade) {
this.unconsume()
this.upgrade = true
return 2
}
Expand Down Expand Up @@ -723,7 +725,6 @@ class Parser extends HTTPParser {
destroy () {
clearTimeout(this.timeout)
this.timeout = null
this.unconsume()
setImmediate((self) => self.close(), this)
}
}
Expand Down Expand Up @@ -769,6 +770,18 @@ function onSocketError (err) {
}
}

function onSocketData (data) {
const { [kParser]: parser } = this

let ret
try {
ret = parser.execute(data)
} catch (err) {
ret = err
}
parser[HTTPParser.kOnExecute](ret, data)
}

function onSocketEnd () {
util.destroy(this, new SocketError('other side closed'))
}
Expand All @@ -787,6 +800,7 @@ function detachSocket (socket) {
.removeListener('session', onSocketSession)
.removeListener('error', onSocketError)
.removeListener('end', onSocketEnd)
.removeListener('data', onSocketData)
.removeListener('close', onSocketClose)
}

Expand Down Expand Up @@ -862,15 +876,6 @@ function connect (client) {

const parser = new Parser(client, socket)

/* istanbul ignore next */
if (nodeMajorVersion >= 12) {
assert(socket._handle)
parser.consume(socket._handle)
} else {
assert(socket._handle && socket._handle._externalStream)
parser.consume(socket._handle._externalStream)
}

socket[kIdleTimeout] = null
socket[kIdleTimeoutValue] = null
socket[kWriting] = false
Expand Down
2 changes: 1 addition & 1 deletion test/socket-handle-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ test('resume error', (t) => {
client[kSocket]._handle.readStart = () => -100

data.body.on('error', (err) => {
t.strictEqual(err.code, -100)
t.strictEqual(err.code, 'EPROTO')
})

setTimeout(() => {
Expand Down

0 comments on commit 3fecee9

Please sign in to comment.