diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 0cb6528ae06..62a3e29ef24 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -49,7 +49,8 @@ const { kMaxResponseSize, kListeners, kOnError, - kResume + kResume, + kHTTPContext } = require('../core/symbols.js') const constants = require('../llhttp/constants.js') @@ -403,6 +404,7 @@ class Parser { removeAllListeners(socket) client[kSocket] = null + client[kHTTPContext] = null // TODO (fix): This is hacky... client[kQueue][client[kRunningIdx]++] = null client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade')) @@ -643,6 +645,8 @@ function onParserTimeout (parser) { } async function connectH1 (client, socket) { + client[kSocket] = socket + if (!llhttpInstance) { llhttpInstance = await llhttpPromise llhttpPromise = null @@ -706,6 +710,7 @@ async function connectH1 (client, socket) { const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) client[kSocket] = null + client[kHTTPContext] = null // TODO (fix): This is hacky... if (client.destroyed) { assert(client[kPending] === 0) @@ -733,6 +738,11 @@ async function connectH1 (client, socket) { client[kResume]() }) + let closed = false + socket.on('close', () => { + closed = true + }) + return { version: 'h1', defaultPipelining: 1, @@ -742,38 +752,48 @@ async function connectH1 (client, socket) { resume () { resumeH1(client) }, - destroy () { + destroy (err, callback) { + if (closed) { + queueMicrotask(callback) + } else { + socket.destroy(err).on('close', callback) + } + }, + get destroyed () { + return socket.destroyed }, busy (request) { if (socket[kWriting] || socket[kReset] || socket[kBlocking]) { return true } - if (client[kRunning] > 0 && !request.idempotent) { - // Non-idempotent request cannot be retried. - // Ensure that no other requests are inflight and - // could cause failure. - return true - } - - if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) { - // Don't dispatch an upgrade until all preceding requests have completed. - // A misbehaving server might upgrade the connection before all pipelined - // request has completed. - return true - } + if (request) { + if (client[kRunning] > 0 && !request.idempotent) { + // Non-idempotent request cannot be retried. + // Ensure that no other requests are inflight and + // could cause failure. + return true + } - if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 && - (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) { - // Request with stream or iterator body can error while other requests - // are inflight and indirectly error those as well. - // Ensure this doesn't happen by waiting for inflight - // to complete before dispatching. + if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) { + // Don't dispatch an upgrade until all preceding requests have completed. + // A misbehaving server might upgrade the connection before all pipelined + // request has completed. + return true + } - // Request with stream or iterator body cannot be retried. - // Ensure that no other requests are inflight and - // could cause failure. - return true + if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 && + (util.isStream(request.body) || util.isAsyncIterable(request.body) || util.isFormDataLike(request.body))) { + // Request with stream or iterator body can error while other requests + // are inflight and indirectly error those as well. + // Ensure this doesn't happen by waiting for inflight + // to complete before dispatching. + + // Request with stream or iterator body cannot be retried. + // Ensure that no other requests are inflight and + // could cause failure. + return true + } } return false diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index dcd250254bf..8155d6e226a 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -55,6 +55,8 @@ const { } = http2 async function connectH2 (client, socket) { + client[kSocket] = socket + if (!h2ExperimentalWarned) { h2ExperimentalWarned = true process.emitWarning('H2 support is experimental, expect them to change at any time.', { @@ -114,6 +116,11 @@ async function connectH2 (client, socket) { util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) }) + let closed = false + socket.on('close', () => { + closed = true + }) + return { version: 'h2', defaultPipelining: Infinity, @@ -124,8 +131,16 @@ async function connectH2 (client, socket) { resume () { }, - destroy (err) { + destroy (err, callback) { session.destroy(err) + if (closed) { + queueMicrotask(callback) + } else { + socket.destroy(err).on('close', callback) + } + }, + get destroyed () { + return socket.destroyed }, busy () { return false diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 7c75c2ff239..8cc334d0baf 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -17,17 +17,14 @@ const { const buildConnector = require('../core/connect.js') const { kUrl, - kReset, kServerName, kClient, kBusy, kConnect, - kBlocking, kResuming, kRunning, kPending, kSize, - kWriting, kQueue, kConnected, kConnecting, @@ -38,7 +35,6 @@ const { kRunningIdx, kError, kPipelining, - kSocket, kKeepAliveTimeoutValue, kMaxHeadersSize, kKeepAliveMaxTimeout, @@ -216,7 +212,6 @@ class Client extends DispatcherBase { : [createRedirectInterceptor({ maxRedirections })] this[kUrl] = util.parseOrigin(url) this[kConnector] = connect - this[kSocket] = null this[kPipelining] = pipelining != null ? pipelining : 1 this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout @@ -277,13 +272,12 @@ class Client extends DispatcherBase { } get [kConnected] () { - return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed + return !!this[kHTTPContext] && !this[kConnecting] && !this[kHTTPContext].destroyed } get [kBusy] () { - const socket = this[kSocket] - return ( - (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) || + return Boolean( + this[kHTTPContext]?.busy(null) || (this[kSize] >= (getPipelining(this) || 1)) || this[kPending] > 0 ) @@ -346,13 +340,9 @@ class Client extends DispatcherBase { resolve(null) } - if (this[kHTTPContext] != null) { - this[kHTTPContext].destroy(err) + if (this[kHTTPContext]) { + this[kHTTPContext].destroy(err, callback) this[kHTTPContext] = null - } - - if (this[kSocket]) { - this[kSocket].destroy(err).on('close', callback) } else { queueMicrotask(callback) } @@ -386,7 +376,7 @@ function onError (client, err) { async function connect (client) { assert(!client[kConnecting]) - assert(!client[kSocket]) + assert(!client[kHTTPContext]) let { host, hostname, protocol, port } = client[kUrl] @@ -441,21 +431,24 @@ async function connect (client) { return } - client[kConnecting] = false - assert(socket) - client[kHTTPContext] = socket.alpnProtocol === 'h2' - ? await connectH2(client, socket) - : await connectH1(client, socket) + try { + client[kHTTPContext] = socket.alpnProtocol === 'h2' + ? await connectH2(client, socket) + : await connectH1(client, socket) + } catch (err) { + socket.destroy().on('error', () => {}) + throw err + } + + client[kConnecting] = false socket[kCounter] = 0 socket[kMaxRequests] = client[kMaxRequests] socket[kClient] = client socket[kError] = null - client[kSocket] = socket - if (channels.connected.hasSubscribers) { channels.connected.publish({ connectParams: { @@ -546,8 +539,6 @@ function _resume (client, sync) { return } - const socket = client[kSocket] - if (client[kHTTPContext]) { client[kHTTPContext].resume() } @@ -580,27 +571,19 @@ function _resume (client, sync) { } client[kServerName] = request.servername - - if (socket && socket.servername !== request.servername) { - util.destroy(socket, new InformationalError('servername changed')) - return - } + client[kHTTPContext]?.destroy(new InformationalError('servername changed')) } if (client[kConnecting]) { return } - if (!socket) { + if (!client[kHTTPContext]) { connect(client) return } - if (socket.destroyed) { - return - } - - if (!client[kHTTPContext]) { + if (client[kHTTPContext].destroyed) { return }