From 3fecee9124a4c5430a77d7fca4c7d5c7bfed4645 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sat, 27 Feb 2021 00:52:27 +0100 Subject: [PATCH] use parser.execute --- lib/client-request.js | 13 +++++++------ lib/core/client.js | 35 ++++++++++++++++++++--------------- test/socket-handle-error.js | 2 +- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/lib/client-request.js b/lib/client-request.js index 7c65b76cb37..5c438bd0fad 100644 --- a/lib/client-request.js +++ b/lib/client-request.js @@ -112,13 +112,14 @@ class RequestHandler extends AsyncResource { onComplete (trailers) { const { res } = this - removeSignal(this) - - if (trailers) { - util.parseHeaders(trailers, this.trailers) - } + util.queueMicrotask(() => { + removeSignal(this) - res.push(null) + if (trailers) { + util.parseHeaders(trailers, this.trailers) + } + res.push(null) + }) } onError (err) { diff --git a/lib/core/client.js b/lib/core/client.js index 37eee720b19..38fdaf4c9aa 100644 --- a/lib/core/client.js +++ b/lib/core/client.js @@ -423,6 +423,8 @@ class Parser extends HTTPParser { socketPause(this.socket) } + + socket.on('data', onSocketData) } [HTTPParser.kOnHeaders] (rawHeaders) { @@ -439,7 +441,7 @@ class Parser extends HTTPParser { } } - [HTTPParser.kOnExecute] (ret) { + [HTTPParser.kOnExecute] (ret, currentBuffer) { if (this.paused) { this.queue.push([this[HTTPParser.kOnExecute], ret]) return @@ -480,17 +482,18 @@ class Parser extends HTTPParser { // Reset socket state to non flowing: socket._readableState.flowing = null - socket.unshift(this.getCurrentBuffer().slice(ret)) + socket.unshift(currentBuffer.slice(ret)) try { - request.onUpgrade(statusCode, headers, socket) - if (!socket.destroyed && !request.aborted) { detachSocket(socket) + client[kSocket] = null client[kQueue][client[kRunningIdx]++] = null client.emit('disconnect', client, new InformationalError('upgrade')) + + request.onUpgrade(statusCode, headers, socket) } resume(client) @@ -551,7 +554,6 @@ class Parser extends HTTPParser { this.request = request if (request.upgrade) { - this.unconsume() this.upgrade = true return 2 } @@ -723,7 +725,6 @@ class Parser extends HTTPParser { destroy () { clearTimeout(this.timeout) this.timeout = null - this.unconsume() setImmediate((self) => self.close(), this) } } @@ -769,6 +770,18 @@ function onSocketError (err) { } } +function onSocketData (data) { + const { [kParser]: parser } = this + + let ret + try { + ret = parser.execute(data) + } catch (err) { + ret = err + } + parser[HTTPParser.kOnExecute](ret, data) +} + function onSocketEnd () { util.destroy(this, new SocketError('other side closed')) } @@ -787,6 +800,7 @@ function detachSocket (socket) { .removeListener('session', onSocketSession) .removeListener('error', onSocketError) .removeListener('end', onSocketEnd) + .removeListener('data', onSocketData) .removeListener('close', onSocketClose) } @@ -862,15 +876,6 @@ function connect (client) { const parser = new Parser(client, socket) - /* istanbul ignore next */ - if (nodeMajorVersion >= 12) { - assert(socket._handle) - parser.consume(socket._handle) - } else { - assert(socket._handle && socket._handle._externalStream) - parser.consume(socket._handle._externalStream) - } - socket[kIdleTimeout] = null socket[kIdleTimeoutValue] = null socket[kWriting] = false diff --git a/test/socket-handle-error.js b/test/socket-handle-error.js index 951a03ea97f..a2558ba275e 100644 --- a/test/socket-handle-error.js +++ b/test/socket-handle-error.js @@ -59,7 +59,7 @@ test('resume error', (t) => { client[kSocket]._handle.readStart = () => -100 data.body.on('error', (err) => { - t.strictEqual(err.code, -100) + t.strictEqual(err.code, 'EPROTO') }) setTimeout(() => {