From 7b60d86d5d8e27c3b7d9eb67f173485f80aed742 Mon Sep 17 00:00:00 2001 From: ywave620 <60539365+ywave620@users.noreply.github.com> Date: Wed, 27 Jul 2022 17:09:06 +0800 Subject: [PATCH] http: reuse socket only when it is drained Ensuring every request is assigned to a drained socket or nothing. Because is has no benifit for a request to be attached to a non drained socket and it prevents the request from being assigned to a drained one, which might occur soon or already in the free pool We achieve this by claiming a socket as free only when the socket is drained. PR-URL: https://github.com/nodejs/node/pull/43902 Reviewed-By: Matteo Collina Reviewed-By: Paolo Insogna Reviewed-By: Robert Nagy --- lib/_http_client.js | 12 +- lib/_http_outgoing.js | 4 +- ...st-http-agent-reuse-drained-socket-only.js | 122 ++++++++++++++++++ 3 files changed, 134 insertions(+), 4 deletions(-) create mode 100644 test/parallel/test-http-agent-reuse-drained-socket-only.js diff --git a/lib/_http_client.js b/lib/_http_client.js index 9c50af5a57e7d1..d66af7903898d7 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -665,7 +665,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) { // Add our listener first, so that we guarantee socket cleanup res.on('end', responseOnEnd); - req.on('prefinish', requestOnPrefinish); + req.on('finish', requestOnFinish); socket.on('timeout', responseOnTimeout); // If the user did not listen for the 'response' event, then they @@ -737,12 +737,16 @@ function responseOnEnd() { socket.end(); } assert(!socket.writable); - } else if (req.finished && !this.aborted) { + } else if (req.writableFinished && !this.aborted) { + assert(req.finished); // We can assume `req.finished` means all data has been written since: // - `'responseOnEnd'` means we have been assigned a socket. // - when we have a socket we write directly to it without buffering. // - `req.finished` means `end()` has been called and no further data. // can be written + // In addition, `req.writableFinished` means all data written has been + // accepted by the kernel. (i.e. the `req.socket` is drained).Without + // this constraint, we may assign a non drained socket to a request. responseKeepAlive(req); } } @@ -755,7 +759,9 @@ function responseOnTimeout() { res.emit('timeout'); } -function requestOnPrefinish() { +// This function is necessary in the case where we receive the entire reponse +// from server before we finish sending out the request +function requestOnFinish() { const req = this; if (req.shouldKeepAlive && req._ended) diff --git a/lib/_http_outgoing.js b/lib/_http_outgoing.js index 006ac437a14938..dcdea29968590a 100644 --- a/lib/_http_outgoing.js +++ b/lib/_http_outgoing.js @@ -985,6 +985,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) { }; +// This function is called once all user data are flushed to the socket. +// Note that it has a chance that the socket is not drained. OutgoingMessage.prototype._finish = function _finish() { assert(this.socket); this.emit('prefinish'); @@ -1008,7 +1010,7 @@ OutgoingMessage.prototype._finish = function _finish() { // the socket yet. Thus the outgoing messages need to be prepared to queue // up data internally before sending it on further to the socket's queue. // -// This function, outgoingFlush(), is called by both the Server and Client +// This function, _flush(), is called by both the Server and Client // to attempt to flush any pending messages out to the socket. OutgoingMessage.prototype._flush = function _flush() { const socket = this.socket; diff --git a/test/parallel/test-http-agent-reuse-drained-socket-only.js b/test/parallel/test-http-agent-reuse-drained-socket-only.js new file mode 100644 index 00000000000000..2bd53f40edaaf3 --- /dev/null +++ b/test/parallel/test-http-agent-reuse-drained-socket-only.js @@ -0,0 +1,122 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const http = require('http'); +const net = require('net'); + +const agent = new http.Agent({ + keepAlive: true, + maxFreeSockets: Infinity, + maxSockets: Infinity, + maxTotalSockets: Infinity, +}); + +const server = net.createServer({ + pauseOnConnect: true, +}, (sock) => { + // Do not read anything from `sock` + sock.pause(); + sock.write('HTTP/1.1 200 OK\r\nContent-Length: 0\r\nConnection: Keep-Alive\r\n\r\n'); +}); + +server.listen(0, common.mustCall(() => { + sendFstReq(server.address().port); +})); + +function sendFstReq(serverPort) { + const req = http.request({ + agent, + host: '127.0.0.1', + port: serverPort, + }, (res) => { + res.on('data', noop); + res.on('end', common.mustCall(() => { + // Agent's socket reusing code is registered to process.nextTick(), + // and will be run after this function, make sure it take effect. + setImmediate(sendSecReq, serverPort, req.socket.localPort); + })); + }); + + // Make the `req.socket` non drained, i.e. has some data queued to write to + // and accept by the kernel. In Linux and Mac, we only need to call `req.end(aLargeBuffer)`. + // However, in Windows, the mechanism of acceptance is loose, the following code is a workaround + // for Windows. + + /** + * https://docs.microsoft.com/en-US/troubleshoot/windows/win32/data-segment-tcp-winsock says + * + * Winsock uses the following rules to indicate a send completion to the application + * (depending on how the send is invoked, the completion notification could be the + * function returning from a blocking call, signaling an event, or calling a notification + * function, and so forth): + * - If the socket is still within SO_SNDBUF quota, Winsock copies the data from the application + * send and indicates the send completion to the application. + * - If the socket is beyond SO_SNDBUF quota and there's only one previously buffered send still + * in the stack kernel buffer, Winsock copies the data from the application send and indicates + * the send completion to the application. + * - If the socket is beyond SO_SNDBUF quota and there's more than one previously buffered send + * in the stack kernel buffer, Winsock copies the data from the application send. Winsock doesn't + * indicate the send completion to the application until the stack completes enough sends to put + * back the socket within SO_SNDBUF quota or only one outstanding send condition. + */ + + req.on('socket', () => { + req.socket.on('connect', () => { + // Print tcp send buffer information + console.log(process.report.getReport().libuv.filter((handle) => handle.type === 'tcp')); + + const dataLargerThanTCPSendBuf = Buffer.alloc(1024 * 1024 * 64, 0); + + req.write(dataLargerThanTCPSendBuf); + req.uncork(); + if (process.platform === 'win32') { + assert.ok(req.socket.writableLength === 0); + } + + req.write(dataLargerThanTCPSendBuf); + req.uncork(); + if (process.platform === 'win32') { + assert.ok(req.socket.writableLength === 0); + } + + req.end(dataLargerThanTCPSendBuf); + assert.ok(req.socket.writableLength > 0); + }); + }); +} + +function sendSecReq(serverPort, fstReqCliPort) { + // Make the second request, which should be sent on a new socket + // because the first socket is not drained and hence can not be reused + const req = http.request({ + agent, + host: '127.0.0.1', + port: serverPort, + }, (res) => { + res.on('data', noop); + res.on('end', common.mustCall(() => { + setImmediate(sendThrReq, serverPort, req.socket.localPort); + })); + }); + + req.on('socket', common.mustCall((sock) => { + assert.notStrictEqual(sock.localPort, fstReqCliPort); + })); + req.end(); +} + +function sendThrReq(serverPort, secReqCliPort) { + // Make the third request, the agent should reuse the second socket we just made + const req = http.request({ + agent, + host: '127.0.0.1', + port: serverPort, + }, noop); + + req.on('socket', common.mustCall((sock) => { + assert.strictEqual(sock.localPort, secReqCliPort); + process.exit(0); + })); +} + +function noop() { }