diff --git a/.autod.conf b/.autod.conf index ae55eee..ec9956d 100644 --- a/.autod.conf +++ b/.autod.conf @@ -19,8 +19,5 @@ module.exports = { keep: [ ], semver: [ - 'egg-bin@1', - 'eslint@4', - 'eslint-config-egg@6', ], }; diff --git a/.eslintignore b/.eslintignore deleted file mode 100644 index ef5e7c3..0000000 --- a/.eslintignore +++ /dev/null @@ -1 +0,0 @@ -lib/_http_agent.js diff --git a/.gitignore b/.gitignore index d48053e..8a646f2 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ results node_modules npm-debug.log coverage/ +.nyc_output/ diff --git a/.travis.yml b/.travis.yml index 9edb713..ce21122 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,6 @@ sudo: false language: node_js node_js: - - '4' - - '6' - '8' - '10' install: diff --git a/README.md b/README.md index 8231458..ff166c2 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ [download-image]: https://img.shields.io/npm/dm/agentkeepalive.svg?style=flat-square [download-url]: https://npmjs.org/package/agentkeepalive -The Node.js's missing `keep alive` `http.Agent`. Support `http` and `https`. +The enhancement features `keep alive` `http.Agent`. Support `http` and `https`. ## What's different from original `http.Agent`? @@ -31,6 +31,7 @@ The Node.js's missing `keep alive` `http.Agent`. Support `http` and `https`. - Disable Nagle's algorithm: `socket.setNoDelay(true)` - Add free socket timeout: avoid long time inactivity socket leak in the free-sockets queue. - Add active socket timeout: avoid long time inactivity socket leak in the active-sockets queue. +- TTL for active socket. ## Install @@ -47,13 +48,13 @@ $ npm install agentkeepalive --save * `keepAliveMsecs` {Number} When using the keepAlive option, specifies the initial delay for TCP Keep-Alive packets. Ignored when the keepAlive option is false or undefined. Defaults to 1000. Default = `1000`. Only relevant if `keepAlive` is set to `true`. - * `freeSocketKeepAliveTimeout`: {Number} Sets the free socket to timeout - after `freeSocketKeepAliveTimeout` milliseconds of inactivity on the free socket. + * `freeSocketTimeout`: {Number} Sets the free socket to timeout + after `freeSocketTimeout` milliseconds of inactivity on the free socket. Default is `15000`. Only relevant if `keepAlive` is set to `true`. * `timeout`: {Number} Sets the working socket to timeout after `timeout` milliseconds of inactivity on the working socket. - Default is `freeSocketKeepAliveTimeout * 2`. + Default is `freeSocketTimeout * 2`. * `maxSockets` {Number} Maximum number of sockets to allow per host. Default = `Infinity`. * `maxFreeSockets` {Number} Maximum number of sockets (per host) to leave open @@ -72,8 +73,8 @@ const Agent = require('agentkeepalive'); const keepaliveAgent = new Agent({ maxSockets: 100, maxFreeSockets: 10, - timeout: 60000, - freeSocketKeepAliveTimeout: 30000, // free socket keepalive for 30 seconds + timeout: 60000, // active socket keepalive for 60 seconds + freeSocketTimeout: 30000, // free socket keepalive for 30 seconds }); const options = { @@ -211,7 +212,7 @@ Shortest transaction: 0.00 Socket created: -``` +```bash [proxy.js:120000] keepalive, 50 created, 60000 requestFinished, 1200 req/socket, 0 requests, 0 sockets, 0 unusedSockets, 50 timeout {" <10ms":662," <15ms":17825," <20ms":20552," <30ms":17646," <40ms":2315," <50ms":567," <100ms":377," <150ms":56," <200ms":0," >=200ms+":0} ---------------------------------------------------------------- @@ -221,7 +222,7 @@ Socket created: ## License -``` +```txt (The MIT License) Copyright(c) node-modules and other contributors. diff --git a/appveyor.yml b/appveyor.yml index d3108c3..981e82b 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,7 +1,5 @@ environment: matrix: - - nodejs_version: '4' - - nodejs_version: '6' - nodejs_version: '8' - nodejs_version: '10' diff --git a/circle.yml b/circle.yml index 41f4597..a1b0430 100644 --- a/circle.yml +++ b/circle.yml @@ -1,6 +1,6 @@ machine: node: - version: '4' + version: '8' dependencies: override: @@ -8,10 +8,8 @@ dependencies: test: override: - - nvm i 4.3.2 && rm -rf node_modules && npm i npminstall && node_modules/.bin/npminstall && npm run ci - - nvm i 4 && rm -rf node_modules && npm i npminstall && node_modules/.bin/npminstall && npm run ci - - nvm i 6 && rm -rf node_modules && npm i npminstall && node_modules/.bin/npminstall && npm run ci - - nvm i 7 && rm -rf node_modules && npm i npminstall && node_modules/.bin/npminstall && npm run ci + - nvm i 8 && rm -rf node_modules && npm i npminstall && node_modules/.bin/npminstall && npm run ci + - nvm i 10 && rm -rf node_modules && npm i npminstall && node_modules/.bin/npminstall && npm run ci # https://circleci.com/docs/language-nodejs # https://discuss.circleci.com/t/testing-multiple-versions-of-node/542 diff --git a/example/agent.js b/example/agent.js index 3f0d56f..d4eb5ff 100644 --- a/example/agent.js +++ b/example/agent.js @@ -1,11 +1,9 @@ 'use strict'; const http = require('http'); -const Agent = require('../'); +const Agent = require('..'); -const keepaliveAgent = new Agent({ - keepAlive: true, -}); +const agent = new Agent(); // https://www.google.com/search?q=nodejs&sugexp=chrome,mod=12&sourceid=chrome&ie=UTF-8 const options = { @@ -13,7 +11,7 @@ const options = { path: '/', method: 'GET', port: 80, - agent: keepaliveAgent, + agent, }; function get() { @@ -36,15 +34,15 @@ function get() { get(); setTimeout(() => { - console.log('keep alive sockets:', keepaliveAgent); + console.log('keep alive sockets:', agent); process.exit(); }, 300000); let count = 0; -setInterval(function() { - const name = keepaliveAgent.getName(options); - const sockets = keepaliveAgent.sockets[name] || []; - const freeSockets = keepaliveAgent.freeSockets[name] || []; +setInterval(() => { + const name = agent.getName(options); + const sockets = agent.sockets[name] || []; + const freeSockets = agent.freeSockets[name] || []; console.log('%ss, %s, sockets: %d, destroyed: %s, free sockets: %d, destroyed: %s', ++count, name, sockets.length, sockets[0] && sockets[0].destroyed, freeSockets.length, freeSockets[0] && freeSockets[0].destroyed); diff --git a/example/https_agent.js b/example/https_agent.js index 7a18c4e..78f2a37 100644 --- a/example/https_agent.js +++ b/example/https_agent.js @@ -3,16 +3,14 @@ const https = require('https'); const HttpsAgent = require('..').HttpsAgent; -const keepaliveAgent = new HttpsAgent({ - keepAlive: true, -}); +const agent = new HttpsAgent(); // https://www.google.com/search?q=nodejs&sugexp=chrome,mod=12&sourceid=chrome&ie=UTF-8 const options = { host: 'github.com', port: 443, path: '/', method: 'GET', - agent: keepaliveAgent, + agent, }; let start = Date.now(); @@ -44,6 +42,6 @@ req.on('error', e => { req.end(); setTimeout(() => { - console.log('keep alive sockets:', keepaliveAgent); + console.log('keep alive sockets:', agent); process.exit(); }, 5000); diff --git a/index.d.ts b/index.d.ts index c11636f..1ef5de8 100644 --- a/index.d.ts +++ b/index.d.ts @@ -15,12 +15,16 @@ declare module "agentkeepalive" { } interface HttpOptions extends http.AgentOptions { + keepAlive?: boolean; + freeSocketTimeout?: number; freeSocketKeepAliveTimeout?: number; timeout?: number; socketActiveTTL?: number; } interface HttpsOptions extends https.AgentOptions { + keepAlive?: boolean; + freeSocketTimeout?: number; freeSocketKeepAliveTimeout?: number; timeout?: number; socketActiveTTL?: number; diff --git a/index.js b/index.js index 6138131..6ca1513 100644 --- a/index.js +++ b/index.js @@ -2,3 +2,4 @@ module.exports = require('./lib/agent'); module.exports.HttpsAgent = require('./lib/https_agent'); +module.exports.constants = require('./lib/constants'); diff --git a/lib/_http_agent.js b/lib/_http_agent.js deleted file mode 100644 index c324b7f..0000000 --- a/lib/_http_agent.js +++ /dev/null @@ -1,416 +0,0 @@ -// Copyright Joyent, Inc. and other Node contributors. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the -// "Software"), to deal in the Software without restriction, including -// without limitation the rights to use, copy, modify, merge, publish, -// distribute, sublicense, and/or sell copies of the Software, and to permit -// persons to whom the Software is furnished to do so, subject to the -// following conditions: -// -// The above copyright notice and this permission notice shall be included -// in all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN -// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, -// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR -// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE -// USE OR OTHER DEALINGS IN THE SOFTWARE. - -// patch from https://github.com/nodejs/node/blob/v7.2.1/lib/_http_agent.js - -'use strict'; - -const net = require('net'); -const util = require('util'); -const EventEmitter = require('events'); -const debug = util.debuglog('http'); - -// New Agent code. - -// The largest departure from the previous implementation is that -// an Agent instance holds connections for a variable number of host:ports. -// Surprisingly, this is still API compatible as far as third parties are -// concerned. The only code that really notices the difference is the -// request object. - -// Another departure is that all code related to HTTP parsing is in -// ClientRequest.onSocket(). The Agent is now *strictly* -// concerned with managing a connection pool. - -function Agent(options) { - if (!(this instanceof Agent)) - return new Agent(options); - - EventEmitter.call(this); - - var self = this; - - self.defaultPort = 80; - self.protocol = 'http:'; - - self.options = util._extend({}, options); - - // don't confuse net and make it think that we're connecting to a pipe - self.options.path = null; - self.requests = {}; - self.sockets = {}; - self.freeSockets = {}; - self.keepAliveMsecs = self.options.keepAliveMsecs || 1000; - self.keepAlive = self.options.keepAlive || false; - self.maxSockets = self.options.maxSockets || Agent.defaultMaxSockets; - self.maxFreeSockets = self.options.maxFreeSockets || 256; - - // [patch start] - // free keep-alive socket timeout. By default free socket do not have a timeout. - self.freeSocketKeepAliveTimeout = self.options.freeSocketKeepAliveTimeout || 0; - // working socket timeout. By default working socket do not have a timeout. - self.timeout = self.options.timeout || 0; - // the socket active time to live, even if it's in use - this.socketActiveTTL = this.options.socketActiveTTL || null; - // [patch end] - - self.on('free', function(socket, options) { - var name = self.getName(options); - debug('agent.on(free)', name); - - if (socket.writable && - self.requests[name] && self.requests[name].length) { - // [patch start] - debug('continue handle next request'); - // [patch end] - self.requests[name].shift().onSocket(socket); - if (self.requests[name].length === 0) { - // don't leak - delete self.requests[name]; - } - } else { - // If there are no pending requests, then put it in - // the freeSockets pool, but only if we're allowed to do so. - var req = socket._httpMessage; - if (req && - req.shouldKeepAlive && - socket.writable && - self.keepAlive) { - var freeSockets = self.freeSockets[name]; - var freeLen = freeSockets ? freeSockets.length : 0; - var count = freeLen; - if (self.sockets[name]) - count += self.sockets[name].length; - - if (count > self.maxSockets || freeLen >= self.maxFreeSockets) { - socket.destroy(); - } else { - freeSockets = freeSockets || []; - self.freeSockets[name] = freeSockets; - socket.setKeepAlive(true, self.keepAliveMsecs); - socket.unref(); - socket._httpMessage = null; - self.removeSocket(socket, options); - freeSockets.push(socket); - - // [patch start] - // Add a default error handler to avoid Unhandled 'error' event throw on idle socket - // https://github.com/node-modules/agentkeepalive/issues/25 - // https://github.com/nodejs/node/pull/4482 (fixed in >= 4.4.0 and >= 5.4.0) - if (socket.listeners('error').length === 0) { - socket.once('error', freeSocketErrorListener); - } - // set free keepalive timer - // try to use socket custom freeSocketKeepAliveTimeout first - const freeSocketKeepAliveTimeout = socket.freeSocketKeepAliveTimeout || self.freeSocketKeepAliveTimeout; - socket.setTimeout(freeSocketKeepAliveTimeout); - debug(`push to free socket queue and wait for ${freeSocketKeepAliveTimeout}ms`); - // [patch end] - } - } else { - socket.destroy(); - } - } - }); -} - -util.inherits(Agent, EventEmitter); -exports.Agent = Agent; - -// [patch start] -function freeSocketErrorListener(err) { - var socket = this; - debug('SOCKET ERROR on FREE socket:', err.message, err.stack); - socket.destroy(); - socket.emit('agentRemove'); -} -// [patch end] - -Agent.defaultMaxSockets = Infinity; - -Agent.prototype.createConnection = net.createConnection; - -// Get the key for a given set of request options -Agent.prototype.getName = function getName(options) { - var name = options.host || 'localhost'; - - name += ':'; - if (options.port) - name += options.port; - - name += ':'; - if (options.localAddress) - name += options.localAddress; - - // Pacify parallel/test-http-agent-getname by only appending - // the ':' when options.family is set. - if (options.family === 4 || options.family === 6) - name += ':' + options.family; - - return name; -}; - -// [patch start] -function handleSocketCreation(req) { - return function(err, newSocket) { - if (err) { - process.nextTick(function() { - req.emit('error', err); - }); - return; - } - req.onSocket(newSocket); - } -} -// [patch end] - -Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/, - localAddress/*legacy*/) { - // Legacy API: addRequest(req, host, port, localAddress) - if (typeof options === 'string') { - options = { - host: options, - port, - localAddress - }; - } - - options = util._extend({}, options); - options = util._extend(options, this.options); - - if (!options.servername) - options.servername = calculateServerName(options, req); - - var name = this.getName(options); - if (!this.sockets[name]) { - this.sockets[name] = []; - } - - var freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0; - var sockLen = freeLen + this.sockets[name].length; - - if (freeLen) { - // we have a free socket, so use that. - var socket = this.freeSockets[name].shift(); - debug('have free socket'); - - // [patch start] - // remove free socket error event handler - socket.removeListener('error', freeSocketErrorListener); - // restart the default timer - socket.setTimeout(this.timeout); - - if (this.socketActiveTTL && Date.now() - socket.createdTime > this.socketActiveTTL) { - debug(`socket ${socket.createdTime} expired`); - socket.destroy(); - return this.createSocket(req, options, handleSocketCreation(req)); - } - // [patch end] - - // don't leak - if (!this.freeSockets[name].length) - delete this.freeSockets[name]; - - socket.ref(); - req.onSocket(socket); - this.sockets[name].push(socket); - } else if (sockLen < this.maxSockets) { - debug('call onSocket', sockLen, freeLen); - // If we are under maxSockets create a new one. - // [patch start] - this.createSocket(req, options, handleSocketCreation(req)); - // [patch end] - } else { - debug('wait for socket'); - // We are over limit so we'll add it to the queue. - if (!this.requests[name]) { - this.requests[name] = []; - } - this.requests[name].push(req); - } -}; - -Agent.prototype.createSocket = function createSocket(req, options, cb) { - var self = this; - options = util._extend({}, options); - options = util._extend(options, self.options); - - if (!options.servername) - options.servername = calculateServerName(options, req); - - var name = self.getName(options); - options._agentKey = name; - - debug('createConnection', name, options); - options.encoding = null; - var called = false; - const newSocket = self.createConnection(options, oncreate); - // [patch start] - if (newSocket) { - oncreate(null, Object.assign(newSocket, { createdTime: Date.now() })); - } - // [patch end] - function oncreate(err, s) { - if (called) - return; - called = true; - if (err) - return cb(err); - if (!self.sockets[name]) { - self.sockets[name] = []; - } - self.sockets[name].push(s); - debug('sockets', name, self.sockets[name].length); - - function onFree() { - self.emit('free', s, options); - } - s.on('free', onFree); - - function onClose(err) { - debug('CLIENT socket onClose'); - // This is the only place where sockets get removed from the Agent. - // If you want to remove a socket from the pool, just close it. - // All socket errors end in a close event anyway. - self.removeSocket(s, options); - - // [patch start] - self.emit('close'); - // [patch end] - } - s.on('close', onClose); - - // [patch start] - // start socket timeout handler - function onTimeout() { - debug('CLIENT socket onTimeout'); - s.destroy(); - // Remove it from freeSockets immediately to prevent new requests from being sent through this socket. - self.removeSocket(s, options); - self.emit('timeout'); - } - s.on('timeout', onTimeout); - // set the default timer - s.setTimeout(self.timeout); - // [patch end] - - function onRemove() { - // We need this function for cases like HTTP 'upgrade' - // (defined by WebSockets) where we need to remove a socket from the - // pool because it'll be locked up indefinitely - debug('CLIENT socket onRemove'); - self.removeSocket(s, options); - s.removeListener('close', onClose); - s.removeListener('free', onFree); - s.removeListener('agentRemove', onRemove); - - // [patch start] - // remove socket timeout handler - s.setTimeout(0, onTimeout); - // [patch end] - } - s.on('agentRemove', onRemove); - cb(null, s); - } -}; - -function calculateServerName(options, req) { - let servername = options.host; - const hostHeader = req.getHeader('host'); - if (hostHeader) { - // abc => abc - // abc:123 => abc - // [::1] => ::1 - // [::1]:123 => ::1 - if (hostHeader.startsWith('[')) { - const index = hostHeader.indexOf(']'); - if (index === -1) { - // Leading '[', but no ']'. Need to do something... - servername = hostHeader; - } else { - servername = hostHeader.substr(1, index - 1); - } - } else { - servername = hostHeader.split(':', 1)[0]; - } - } - return servername; -} - -Agent.prototype.removeSocket = function removeSocket(s, options) { - var name = this.getName(options); - debug('removeSocket', name, 'writable:', s.writable); - var sets = [this.sockets]; - - // If the socket was destroyed, remove it from the free buffers too. - if (!s.writable) - sets.push(this.freeSockets); - - for (var sk = 0; sk < sets.length; sk++) { - var sockets = sets[sk]; - - if (sockets[name]) { - var index = sockets[name].indexOf(s); - if (index !== -1) { - sockets[name].splice(index, 1); - // Don't leak - if (sockets[name].length === 0) - delete sockets[name]; - } - } - } - - // [patch start] - var freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0; - var sockLen = freeLen + (this.sockets[name] ? this.sockets[name].length : 0); - // [patch end] - - if (this.requests[name] && this.requests[name].length && sockLen < this.maxSockets) { - debug('removeSocket, have a request, make a socket'); - var req = this.requests[name][0]; - // If we have pending requests and a socket gets closed make a new one - this.createSocket(req, options, function(err, newSocket) { - if (err) { - process.nextTick(function() { - req.emit('error', err); - }); - return; - } - newSocket.emit('free'); - }); - } -}; - -Agent.prototype.destroy = function destroy() { - var sets = [this.freeSockets, this.sockets]; - for (var s = 0; s < sets.length; s++) { - var set = sets[s]; - var keys = Object.keys(set); - for (var v = 0; v < keys.length; v++) { - var setName = set[keys[v]]; - for (var n = 0; n < setName.length; n++) { - setName[n].destroy(); - } - } - } -}; - -exports.globalAgent = new Agent(); diff --git a/lib/agent.js b/lib/agent.js index a51ad59..6e618e3 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -1,43 +1,64 @@ -/** - * refer: - * * @atimb "Real keep-alive HTTP agent": https://gist.github.com/2963672 - * * https://github.com/joyent/node/blob/master/lib/http.js - * * https://github.com/joyent/node/blob/master/lib/https.js - * * https://github.com/joyent/node/blob/master/lib/_http_agent.js - */ - 'use strict'; -const OriginalAgent = require('./_http_agent').Agent; +const OriginalAgent = require('http').Agent; const ms = require('humanize-ms'); +const debug = require('debug')('agentkeepalive'); +const deprecate = require('depd')('agentkeepalive'); +const { + INIT_SOCKET, + CURRENT_ID, + CREATE_ID, + SOCKET_CREATED_TIME, + SOCKET_NAME, + SOCKET_REQUEST_COUNT, + SOCKET_REQUEST_FINISHED_COUNT, +} = require('./constants'); + +// OriginalAgent come from +// - https://github.com/nodejs/node/blob/v8.12.0/lib/_http_agent.js +// - https://github.com/nodejs/node/blob/v10.12.0/lib/_http_agent.js class Agent extends OriginalAgent { constructor(options) { options = options || {}; options.keepAlive = options.keepAlive !== false; // default is keep-alive and 15s free socket timeout - if (options.freeSocketKeepAliveTimeout === undefined) { - options.freeSocketKeepAliveTimeout = 15000; + if (options.freeSocketTimeout === undefined) { + options.freeSocketTimeout = 15000; } - // Legacy API: keepAliveTimeout should be rename to `freeSocketKeepAliveTimeout` + // Legacy API: keepAliveTimeout should be rename to `freeSocketTimeout` if (options.keepAliveTimeout) { - options.freeSocketKeepAliveTimeout = options.keepAliveTimeout; + deprecate('options.keepAliveTimeout is deprecated, please use options.freeSocketTimeout instead'); + options.freeSocketTimeout = options.keepAliveTimeout; + delete options.keepAliveTimeout; + } + // Legacy API: freeSocketKeepAliveTimeout should be rename to `freeSocketTimeout` + if (options.freeSocketKeepAliveTimeout) { + deprecate('options.freeSocketKeepAliveTimeout is deprecated, please use options.freeSocketTimeout instead'); + options.freeSocketTimeout = options.freeSocketKeepAliveTimeout; + delete options.freeSocketKeepAliveTimeout; } - options.freeSocketKeepAliveTimeout = ms(options.freeSocketKeepAliveTimeout); // Sets the socket to timeout after timeout milliseconds of inactivity on the socket. - // By default is double free socket keepalive timeout. + // By default is double free socket timeout. if (options.timeout === undefined) { - options.timeout = options.freeSocketKeepAliveTimeout * 2; + options.timeout = options.freeSocketTimeout * 2; // make sure socket default inactivity timeout >= 30s if (options.timeout < 30000) { options.timeout = 30000; } } + + // support humanize format options.timeout = ms(options.timeout); + options.freeSocketTimeout = ms(options.freeSocketTimeout); + options.socketActiveTTL = options.socketActiveTTL ? ms(options.socketActiveTTL) : 0; super(options); + this[CURRENT_ID] = 0; + + // create socket success counter this.createSocketCount = 0; this.createSocketCountLastCheck = 0; @@ -51,42 +72,128 @@ class Agent extends OriginalAgent { this.errorSocketCount = 0; this.errorSocketCountLastCheck = 0; + // request finished counter this.requestCount = 0; this.requestCountLastCheck = 0; + // including free socket timeout counter this.timeoutSocketCount = 0; this.timeoutSocketCountLastCheck = 0; + } + + get freeSocketKeepAliveTimeout() { + deprecate('agent.freeSocketKeepAliveTimeout is deprecated, please use agent.options.freeSocketTimeout instead'); + return this.options.freeSocketTimeout; + } + + get timeout() { + deprecate('agent.timeout is deprecated, please use agent.options.timeout instead'); + return this.options.timeout; + } + + get socketActiveTTL() { + deprecate('agent.socketActiveTTL is deprecated, please use agent.options.socketActiveTTL instead'); + return this.options.socketActiveTTL; + } + + keepSocketAlive(socket) { + const result = super.keepSocketAlive(socket); + // should not keepAlive, do nothing + if (!result) return result; + + let freeSocketTimeout = this.options.freeSocketTimeout; + const socketActiveTTL = this.options.socketActiveTTL; + if (socketActiveTTL) { + // check socketActiveTTL + const aliveTime = Date.now() - socket[SOCKET_CREATED_TIME]; + const diff = socketActiveTTL - aliveTime; + // destroy it + if (diff <= 0) { + debug('%s(requests: %s, finished: %s) free but need to destroy by TTL, alive %sms(max %sms)', + socket[SOCKET_NAME], socket[SOCKET_REQUEST_COUNT], socket[SOCKET_REQUEST_FINISHED_COUNT], + aliveTime, socketActiveTTL); + return false; + } + + if (freeSocketTimeout && diff < freeSocketTimeout) { + debug('%s(requests: %s, finished: %s) free and wait for %sms TTL timeout', + socket[SOCKET_NAME], socket[SOCKET_REQUEST_COUNT], socket[SOCKET_REQUEST_FINISHED_COUNT], + diff); + freeSocketTimeout = diff; + } + } + // set freeSocketTimeout + if (freeSocketTimeout) { + // set free keepalive timer + // try to use socket custom freeSocketTimeout first, support headers['keep-alive'] + // https://github.com/node-modules/urllib/blob/b76053020923f4d99a1c93cf2e16e0c5ba10bacf/lib/urllib.js#L498 + const customFreeSocketTimeout = socket.freeSocketTimeout || socket.freeSocketKeepAliveTimeout; + if (customFreeSocketTimeout && customFreeSocketTimeout < freeSocketTimeout) { + freeSocketTimeout = customFreeSocketTimeout; + } + // FIXME: need to make setRequestSocket as a method on Agent class + // then we can reset the agent.options.timeout when free socket is reused. + socket.setTimeout(freeSocketTimeout); + } + return true; + } + + // only call on addRequest + reuseSocket(...args) { + // reuseSocket(socket, req) + super.reuseSocket(...args); + const socket = args[0]; + const agentTimeout = this.options.timeout; + if (socket.timeout !== agentTimeout) { + // reset timeout before use + socket.setTimeout(agentTimeout); + debug('%s reset timeout to %sms', socket[SOCKET_NAME], agentTimeout); + } + socket[SOCKET_REQUEST_COUNT]++; + debug('%s(requests: %s, finished: %s) reuse on addRequest, timeout %sms', + socket[SOCKET_NAME], socket[SOCKET_REQUEST_COUNT], socket[SOCKET_REQUEST_FINISHED_COUNT], + socket.timeout); + } + + [CREATE_ID]() { + const id = this[CURRENT_ID]++; + if (this[CURRENT_ID] === Number.MAX_SAFE_INTEGER) this[CURRENT_ID] = 0; + return id; + } + + [INIT_SOCKET](socket, options) { + if (this.keepAlive) { + // Disable Nagle's algorithm: http://blog.caustik.com/2012/04/08/scaling-node-js-to-100k-concurrent-connections/ + // https://fengmk2.com/benchmark/nagle-algorithm-delayed-ack-mock.html + socket.setNoDelay(true); + } + this.createSocketCount++; + if (this.options.socketActiveTTL) { + socket[SOCKET_CREATED_TIME] = Date.now(); + } + // don't show the hole '-----BEGIN CERTIFICATE----' key string + socket[SOCKET_NAME] = `sock[${this[CREATE_ID]()}#${options._agentKey}]`.split('-----BEGIN', 1)[0]; + socket[SOCKET_REQUEST_COUNT] = 1; + socket[SOCKET_REQUEST_FINISHED_COUNT] = 0; + installListeners(this, socket, options); + } + + createConnection(options, oncreate) { + let called = false; + const onNewCreate = (err, socket) => { + if (called) return; + called = true; - this.on('free', s => { - this.requestCount++; - // last enter free queue timestamp - s.lastFreeTime = Date.now(); - }); - this.on('timeout', () => { - this.timeoutSocketCount++; - }); - this.on('close', () => { - this.closeSocketCount++; - }); - this.on('error', () => { - this.errorSocketCount++; - }); - } - - createSocket(req, options, cb) { - super.createSocket(req, options, (err, socket) => { if (err) { this.createSocketErrorCount++; - return cb(err); - } - if (this.keepAlive) { - // Disable Nagle's algorithm: http://blog.caustik.com/2012/04/08/scaling-node-js-to-100k-concurrent-connections/ - // https://fengmk2.com/benchmark/nagle-algorithm-delayed-ack-mock.html - socket.setNoDelay(true); + return oncreate(err); } - this.createSocketCount++; - cb(null, socket); - }); + this[INIT_SOCKET](socket, options); + oncreate(err, socket); + }; + + const newSocket = super.createConnection(options, onNewCreate); + if (newSocket) onNewCreate(null, newSocket); } get statusChanged() { @@ -122,6 +229,109 @@ class Agent extends OriginalAgent { } } +function installListeners(agent, socket, options) { + debug('%s create, timeout %sms', socket[SOCKET_NAME], socket.timeout); + + // listener socket events: close, timeout, error, free + function onFree() { + // create and socket.emit('free') logic + // https://github.com/nodejs/node/blob/master/lib/_http_agent.js#L311 + // no req on the socket, it should be the new socket + if (!socket._httpMessage && socket[SOCKET_REQUEST_COUNT] === 1) return; + + socket[SOCKET_REQUEST_FINISHED_COUNT]++; + agent.requestCount++; + debug('%s(requests: %s, finished: %s) free', + socket[SOCKET_NAME], socket[SOCKET_REQUEST_COUNT], socket[SOCKET_REQUEST_FINISHED_COUNT]); + + // should reuse on pedding requests? + const name = agent.getName(options); + if (socket.writable && agent.requests[name] && agent.requests[name].length) { + // will be reuse on agent free listener + socket[SOCKET_REQUEST_COUNT]++; + debug('%s(requests: %s, finished: %s) will be reuse on agent free event', + socket[SOCKET_NAME], socket[SOCKET_REQUEST_COUNT], socket[SOCKET_REQUEST_FINISHED_COUNT]); + } + } + socket.on('free', onFree); + + function onClose(isError) { + debug('%s(requests: %s, finished: %s) close, isError: %s', + socket[SOCKET_NAME], socket[SOCKET_REQUEST_COUNT], socket[SOCKET_REQUEST_FINISHED_COUNT], isError); + agent.closeSocketCount++; + } + socket.on('close', onClose); + + // start socket timeout handler + function onTimeout() { + const listenerCount = socket.listeners('timeout').length; + debug('%s(requests: %s, finished: %s) timeout after %sms, listeners %s', + socket[SOCKET_NAME], socket[SOCKET_REQUEST_COUNT], socket[SOCKET_REQUEST_FINISHED_COUNT], + socket.timeout, listenerCount); + agent.timeoutSocketCount++; + const name = agent.getName(options); + if (agent.freeSockets[name] && agent.freeSockets[name].indexOf(socket) !== -1) { + // free socket timeout, destroy quietly + socket.destroy(); + // Remove it from freeSockets list immediately to prevent new requests + // from being sent through this socket. + agent.removeSocket(socket, options); + debug('%s is free, destroy quietly', socket[SOCKET_NAME]); + } else { + // if there is no any request socket timeout handler, + // agent need to handle socket timeout itself. + // + // custom request socket timeout handle logic must follow these rules: + // 1. Destroy socket first + // 2. Must emit socket 'agentRemove' event tell agent remove socket + // from freeSockets list immediately. + // Otherise you may be get 'socket hang up' error when reuse + // free socket and timeout happen in the same time. + if (listenerCount === 1) { + const error = new Error('Socket timeout'); + error.code = 'ERR_SOCKET_TIMEOUT'; + error.timeout = socket.timeout; + // must manually call socket.end() or socket.destroy() to end the connection. + // https://nodejs.org/dist/latest-v10.x/docs/api/net.html#net_socket_settimeout_timeout_callback + socket.destroy(error); + agent.removeSocket(socket, options); + debug('%s destroy with timeout error', socket[SOCKET_NAME]); + } + } + } + socket.on('timeout', onTimeout); + + function onError(err) { + const listenerCount = socket.listeners('error').length; + debug('%s(requests: %s, finished: %s) error: %s, listenerCount: %s', + socket[SOCKET_NAME], socket[SOCKET_REQUEST_COUNT], socket[SOCKET_REQUEST_FINISHED_COUNT], + err, listenerCount); + agent.errorSocketCount++; + if (listenerCount === 1) { + // if socket don't contain error event handler, don't catch it, emit it again + debug('%s emit uncaught error event', socket[SOCKET_NAME]); + socket.removeListener('error', onError); + socket.emit('error', err); + } + } + socket.on('error', onError); + + function onRemove() { + debug('%s(requests: %s, finished: %s) agentRemove', + socket[SOCKET_NAME], + socket[SOCKET_REQUEST_COUNT], socket[SOCKET_REQUEST_FINISHED_COUNT]); + // We need this function for cases like HTTP 'upgrade' + // (defined by WebSockets) where we need to remove a socket from the + // pool because it'll be locked up indefinitely + socket.removeListener('close', onClose); + socket.removeListener('error', onError); + socket.removeListener('free', onFree); + socket.removeListener('timeout', onTimeout); + socket.removeListener('agentRemove', onRemove); + } + socket.on('agentRemove', onRemove); +} + module.exports = Agent; function inspect(obj) { diff --git a/lib/constants.js b/lib/constants.js new file mode 100644 index 0000000..ca7ab97 --- /dev/null +++ b/lib/constants.js @@ -0,0 +1,14 @@ +'use strict'; + +module.exports = { + // agent + CURRENT_ID: Symbol('agentkeepalive#currentId'), + CREATE_ID: Symbol('agentkeepalive#createId'), + INIT_SOCKET: Symbol('agentkeepalive#initSocket'), + CREATE_HTTPS_CONNECTION: Symbol('agentkeepalive#createHttpsConnection'), + // socket + SOCKET_CREATED_TIME: Symbol('agentkeepalive#socketCreatedTime'), + SOCKET_NAME: Symbol('agentkeepalive#socketName'), + SOCKET_REQUEST_COUNT: Symbol('agentkeepalive#socketRequestCount'), + SOCKET_REQUEST_FINISHED_COUNT: Symbol('agentkeepalive#socketRequestFinishedCount'), +}; diff --git a/lib/https_agent.js b/lib/https_agent.js index e6d58a3..73f529d 100644 --- a/lib/https_agent.js +++ b/lib/https_agent.js @@ -1,12 +1,11 @@ -/** - * Https Agent base on custom http agent - */ - 'use strict'; -const https = require('https'); +const OriginalHttpsAgent = require('https').Agent; const HttpAgent = require('./agent'); -const OriginalHttpsAgent = https.Agent; +const { + INIT_SOCKET, + CREATE_HTTPS_CONNECTION, +} = require('./constants'); class HttpsAgent extends HttpAgent { constructor(options) { @@ -15,6 +14,7 @@ class HttpsAgent extends HttpAgent { this.defaultPort = 443; this.protocol = 'https:'; this.maxCachedSessions = this.options.maxCachedSessions; + /* istanbul ignore next */ if (this.maxCachedSessions === undefined) { this.maxCachedSessions = 100; } @@ -24,16 +24,25 @@ class HttpsAgent extends HttpAgent { list: [], }; } + + createConnection(options) { + const socket = this[CREATE_HTTPS_CONNECTION](options); + this[INIT_SOCKET](socket, options); + return socket; + } } +// https://github.com/nodejs/node/blob/master/lib/https.js#L89 +HttpsAgent.prototype[CREATE_HTTPS_CONNECTION] = OriginalHttpsAgent.prototype.createConnection; + [ - 'createConnection', 'getName', '_getSession', '_cacheSession', // https://github.com/nodejs/node/pull/4982 '_evictSession', ].forEach(function(method) { + /* istanbul ignore next */ if (typeof OriginalHttpsAgent.prototype[method] === 'function') { HttpsAgent.prototype[method] = OriginalHttpsAgent.prototype[method]; } diff --git a/package.json b/package.json index a28305b..45eacf2 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,8 @@ "lib" ], "scripts": { - "test": "egg-bin test", + "test": "npm run lint && egg-bin test", + "test-local": "egg-bin test", "cov": "egg-bin cov", "ci": "npm run lint && npm run cov", "lint": "eslint lib test index.js", @@ -29,24 +30,29 @@ "https", "agent", "keepalive", - "agentkeepalive" + "agentkeepalive", + "HttpAgent", + "HttpsAgent" ], "dependencies": { + "debug": "^4.1.0", + "depd": "^1.1.2", "humanize-ms": "^1.2.1" }, "devDependencies": { "autod": "^3.0.1", - "egg-bin": "^1.11.1", - "egg-ci": "^1.8.0", - "eslint": "^4.19.1", - "eslint-config-egg": "^6.0.0", + "egg-bin": "^4.9.0", + "egg-ci": "^1.10.0", + "eslint": "^5.7.0", + "eslint-config-egg": "^7.1.0", + "mm": "^2.4.1", "pedding": "^1.1.0" }, "engines": { - "node": ">= 4.0.0" + "node": ">= 8.0.0" }, "ci": { - "version": "4, 6, 8, 10" + "version": "8, 10" }, "author": "fengmk2 (https://fengmk2.com)", "license": "MIT" diff --git a/test/agent.test.js b/test/agent.test.js index 9e883ca..c5cb4f4 100644 --- a/test/agent.test.js +++ b/test/agent.test.js @@ -4,7 +4,15 @@ const assert = require('assert'); const http = require('http'); const urlparse = require('url').parse; const pedding = require('pedding'); -const Agent = require('../'); +const mm = require('mm'); +const Agent = require('..'); +const { + CURRENT_ID, + SOCKET_NAME, + SOCKET_CREATED_TIME, + SOCKET_REQUEST_COUNT, + SOCKET_REQUEST_FINISHED_COUNT, +} = require('..').constants; describe('test/agent.test.js', () => { const agentkeepalive = new Agent({ @@ -19,6 +27,7 @@ describe('test/agent.test.js', () => { res.destroy(); return; } else if (req.url === '/hang') { + console.log('[new request] %s %s', req.method, req.url); // Wait forever. return; } else if (req.url === '/remote_close') { @@ -30,7 +39,7 @@ describe('test/agent.test.js', () => { if (info.query.timeout) { setTimeout(() => { res.end(info.query.timeout); - }, parseInt(info.query.timeout, 10)); + }, parseInt(info.query.timeout)); return; } res.end(JSON.stringify({ @@ -48,7 +57,12 @@ describe('test/agent.test.js', () => { }); }); - after(done => setTimeout(done, 1500)); + afterEach(mm.restore); + + after(done => setTimeout(() => { + agentkeepalive.destroy(); + done(); + }, 1500)); it('should default options set right', () => { const agent = agentkeepalive; @@ -57,7 +71,12 @@ describe('test/agent.test.js', () => { assert(agent.maxSockets === 5); assert(agent.maxFreeSockets === 5); assert(agent.timeout === 30000); + assert(agent.options.timeout === 30000); + assert(agent.freeSocketKeepAliveTimeout === 1000); + assert(agent.options.freeSocketTimeout === 1000); assert(!agent.socketActiveTTL); + assert(agent.socketActiveTTL === 0); + assert(agent.options.socketActiveTTL === 0); }); let remotePort = null; @@ -106,12 +125,14 @@ describe('test/agent.test.js', () => { req.end(); }); - it('should inactivity socket timeout', done => { + it('should destroy inactivity socket timeout by agent itself', done => { const name = 'localhost:' + port + ':'; const agentkeepalive = new Agent({ freeSocketKeepAliveTimeout: '5s', timeout: '1s', }); + assert(agentkeepalive.options.freeSocketTimeout === 5000); + assert(agentkeepalive.options.timeout === 1000); assert(!agentkeepalive.sockets[name]); assert(!agentkeepalive.freeSockets[name]); http.get({ @@ -121,11 +142,14 @@ describe('test/agent.test.js', () => { }, res => { assert(res.statusCode === 200); const chunks = []; + res.resume(); res.on('data', data => { chunks.push(data); }); res.on('end', () => { - const data = JSON.parse(Buffer.concat(chunks)); + const buf = Buffer.concat(chunks); + console.log('end and got %d bytes', buf.length); + const data = JSON.parse(buf); remotePort = data.socket.port; assert(data.headers.connection === 'keep-alive'); assert(agentkeepalive.sockets[name]); @@ -143,11 +167,69 @@ describe('test/agent.test.js', () => { }, () => { assert(false, 'should not run this'); }).on('error', err => { + assert(err.message === 'Socket timeout'); + assert(err.code === 'ERR_SOCKET_TIMEOUT'); + done(); + }); + }, 20); + }); + }); + }); + + it('should let request handle the socket timeout', done => { + const name = 'localhost:' + port + ':'; + const agentkeepalive = new Agent({ + freeSocketKeepAliveTimeout: '5s', + timeout: '1s', + }); + assert(agentkeepalive.options.freeSocketTimeout === 5000); + assert(agentkeepalive.options.timeout === 1000); + assert(!agentkeepalive.sockets[name]); + assert(!agentkeepalive.freeSockets[name]); + http.get({ + agent: agentkeepalive, + port, + path: '/', + }, res => { + assert(res.statusCode === 200); + const chunks = []; + res.resume(); + res.on('data', data => { + chunks.push(data); + }); + res.on('end', () => { + const buf = Buffer.concat(chunks); + console.log('end and got %d bytes', buf.length); + const data = JSON.parse(buf); + remotePort = data.socket.port; + assert(data.headers.connection === 'keep-alive'); + assert(agentkeepalive.sockets[name]); + assert(!agentkeepalive.freeSockets[name]); + setTimeout(() => { + assert(!agentkeepalive.sockets[name]); + assert(agentkeepalive.freeSockets[name]); + assert(agentkeepalive.freeSockets[name].length === 1); + + // request /hang timeout + let handleTimeout = false; + const req = http.get({ + agent: agentkeepalive, + port, + path: '/hang', + timeout: 2000, + }, () => { + assert(false, 'should not run this'); + }).on('error', err => { + assert(handleTimeout); // TODO: should be a better error message than "socket hang up" assert(err.message === 'socket hang up'); assert(err.code === 'ECONNRESET'); done(); }); + req.on('timeout', () => { + handleTimeout = true; + req.abort(); + }); }, 20); }); }); @@ -189,6 +271,207 @@ describe('test/agent.test.js', () => { assert(!status.freeSockets[name]); }); + it('should mock CURRENT_ID cross MAX_SAFE_INTEGER', _done => { + const agent = new Agent({ + timeout: 1000, + freeSocketTimeout: 1000, + maxSockets: 10, + maxFreeSockets: 5, + }); + agent[CURRENT_ID] = Number.MAX_SAFE_INTEGER - 1; + const done = pedding(300, () => { + // only allow 10 sockets + assert(agent[CURRENT_ID] === 9); + setImmediate(() => { + const name = 'localhost:' + port + ':'; + assert(agent.freeSockets[name].length === 5); + agent.destroy(); + _done(); + }); + }); + function request(callback) { + http.get({ + agent, + port, + path: '/', + }, res => { + assert(res.statusCode === 200); + res.resume(); + res.on('end', callback); + }); + } + for (let i = 0; i < 300; i++) { + request(done); + } + }); + + it('should work on timeout same as freeSocketTimeout', done => { + const agent = new Agent({ + timeout: 1000, + freeSocketTimeout: 1000, + }); + + http.get({ + agent, + port, + path: '/', + }, res => { + const socket1 = res.socket; + assert(socket1.timeout === 1000); + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + setImmediate(() => { + assert(socket1.timeout === 1000); + http.get({ + agent, + port, + path: '/', + }, res => { + const socket2 = res.socket; + assert(socket2 === socket1); + assert(socket2.timeout === 1000); + assert(res.statusCode === 200); + res.resume(); + res.on('end', done); + }); + }); + }); + }); + }); + + it('should work on freeSocketTimeout = 0', done => { + const agent = new Agent({ + timeout: 100, + freeSocketTimeout: 0, + }); + + http.get({ + agent, + port, + path: '/?timeout=80', + }, res => { + const socket1 = res.socket; + assert(socket1.timeout === 100); + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + setTimeout(() => { + assert(socket1.timeout === 100); + http.get({ + agent, + port, + path: '/', + }, res => { + const socket2 = res.socket; + assert(socket2 === socket1); + assert(socket2.timeout === 100); + assert(res.statusCode === 200); + res.resume(); + res.on('end', done); + }); + }, 80); + }); + }); + }); + + it('should createConnection error', done => { + const agent = new Agent(); + mm.error(require('http').Agent.prototype, 'createConnection', 'mock createConnection error'); + http.get({ + agent, + port, + path: '/', + }).on('error', err => { + assert(err); + assert(err.message === 'mock createConnection error'); + done(); + }); + }); + + it('should keepSocketAlive return false, no use any socket', done => { + const agent = new Agent(); + mm(require('http').Agent.prototype, 'keepSocketAlive', () => { + return false; + }); + http.get({ + agent, + port, + path: '/', + }, res => { + const socket1 = res.socket; + res.resume(); + res.on('end', () => { + setImmediate(() => { + http.get({ + agent, + port, + path: '/', + }, res => { + const socket2 = res.socket; + assert(socket2 !== socket1); + res.resume(); + res.on('end', done); + }); + }); + }); + }); + }); + + it('should agent emit socket error event', done => { + const agent = new Agent({ + timeout: 100, + }); + http.get({ + agent, + port, + path: '/hang', + }); + // remove mocha default handler + const originalException = process.listeners('uncaughtException').pop(); + process.removeListener('uncaughtException', originalException); + process.once('uncaughtException', err => { + process.on('uncaughtException', originalException); + assert(err); + assert(err.message === 'Socket timeout'); + done(); + }); + }); + + it('should mock socket error', done => { + done = pedding(2, done); + const agent = new Agent({ + timeout: 100, + }); + const req = http.get({ + agent, + port, + path: '/hang', + }); + req.on('socket', socket => { + // remove req error listener + const listener = socket.listeners('error').pop(); + socket.removeListener('error', listener); + // must destroy before emit error + socket.destroy(); + socket.emit('error', new Error('mock socket error')); + }).on('error', err => { + assert(err); + assert(err.message === 'socket hang up'); + done(); + }); + // remove mocha default handler + const originalException = process.listeners('uncaughtException').pop(); + process.removeListener('uncaughtException', originalException); + assert(process.listeners('uncaughtException').length === 0); + process.once('uncaughtException', err => { + process.on('uncaughtException', originalException); + assert(err); + assert(err.message === 'mock socket error'); + done(); + }); + }); + it('should request again and use the same socket', done => { const name = 'localhost:' + port + ':'; assert(!agentkeepalive.sockets[name]); @@ -230,36 +513,62 @@ describe('test/agent.test.js', () => { }); it('should remove keepalive socket when server side destroy()', done => { - done = pedding(2, done); - const name = 'localhost:' + port + ':'; - assert(!agentkeepalive.sockets[name]); - assert(agentkeepalive.freeSockets[name].length === 1); - - // should emit agent close event too - agentkeepalive.once('close', done); + const agent = new Agent({ + keepAliveTimeout: 1000, + maxSockets: 5, + maxFreeSockets: 5, + }); - const req = http.get({ - agent: agentkeepalive, + http.get({ + agent, port, - path: '/error', - }, () => { - throw new Error('should not call this'); - }); - req.on('error', err => { - assert(err.message === 'socket hang up'); - assert(agentkeepalive.sockets[name].length === 1); - assert(!agentkeepalive.freeSockets[name]); - setTimeout(() => { - assert(!agentkeepalive.sockets[name]); - assert(!agentkeepalive.freeSockets[name]); - done(); - }, 10); + path: '/foo', + }, res => { + assert(res.statusCode === 200); + const chunks = []; + res.on('data', data => { + chunks.push(data); + }); + res.on('end', () => { + const data = JSON.parse(Buffer.concat(chunks)); + assert(data.socket.port); + setTimeout(next, 1); + }); }); - assert(agentkeepalive.sockets[name].length === 1); - assert(!agentkeepalive.freeSockets[name]); + + function next() { + const name = 'localhost:' + port + ':'; + assert(!agent.sockets[name]); + assert(agent.freeSockets[name] && agent.freeSockets[name].length === 1); + + const req = http.get({ + agent, + port, + path: '/error', + }, () => { + assert.fail('should not call this'); + }); + req.on('error', err => { + assert(err.message === 'socket hang up'); + assert(agent.sockets[name].length === 1); + assert(!agent.freeSockets[name]); + setTimeout(() => { + assert(!agent.sockets[name]); + assert(!agent.freeSockets[name]); + done(); + }, 10); + }); + assert(agent.sockets[name].length === 1); + assert(!agent.freeSockets[name]); + } }); it('should remove socket when socket.destroy()', done => { + const agentkeepalive = new Agent({ + freeSocketTimeout: 1000, + maxSockets: 5, + maxFreeSockets: 5, + }); const name = 'localhost:' + port + ':'; assert(!agentkeepalive.sockets[name]); assert(!agentkeepalive.freeSockets[name]); @@ -288,6 +597,11 @@ describe('test/agent.test.js', () => { }); it('should use new socket when hit the max keepalive time: 1000ms', done => { + const agentkeepalive = new Agent({ + freeSocketTimeout: 1000, + maxSockets: 5, + maxFreeSockets: 5, + }); const name = 'localhost:' + port + ':'; assert(!agentkeepalive.sockets[name]); assert(!agentkeepalive.freeSockets[name]); @@ -358,6 +672,11 @@ describe('test/agent.test.js', () => { }); it('should not keepalive when client.abort()', done => { + const agentkeepalive = new Agent({ + freeSocketTimeout: 1000, + maxSockets: 5, + maxFreeSockets: 5, + }); const name = 'localhost:' + port + ':'; assert(!agentkeepalive.sockets[name]); const req = http.get({ @@ -365,7 +684,7 @@ describe('test/agent.test.js', () => { port, path: '/', }, () => { - throw new Error('should not call this.'); + assert.fail('should not call this.'); }); req.on('error', err => { assert(err.message, 'socket hang up'); @@ -561,28 +880,33 @@ describe('test/agent.test.js', () => { }); }); - it('should fire timeout callback', done => { + it('should fire req timeout callback the first use socket', done => { done = pedding(2, done); - const lastStatus = agentkeepalive.getCurrentStatus(); + const agent = new Agent({ + maxSockets: 2, + maxFreeSockets: 2, + }); http.get({ - agent: agentkeepalive, + agent, port, path: '/', }, res => { assert(res.statusCode === 200); res.resume(); res.on('end', () => { + const lastStatus = agent.getCurrentStatus(); const req = http.get({ - agent: agentkeepalive, + agent, port, path: '/hang', }, () => { - throw new Error('should not call this'); + assert.fail('should not call this'); }); - req.setTimeout(400, () => { - const status = agentkeepalive.getCurrentStatus(); + req.setTimeout(100, () => { + const status = agent.getCurrentStatus(); assert(status.timeoutSocketCount - lastStatus.timeoutSocketCount === 1); - setTimeout(done, 300); + req.abort(); + done(); }); req.on('error', err => { assert(err.message === 'socket hang up'); @@ -592,13 +916,52 @@ describe('test/agent.test.js', () => { }); }); - it('should free socket timeout and emit agent timeout event', done => { + it('should fire req timeout callback the second use socket', done => { done = pedding(2, done); + const agent = new Agent({ + maxSockets: 2, + maxFreeSockets: 2, + }); + http.get({ + agent, + port, + path: '/', + }, res => { + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + const lastStatus = agent.getCurrentStatus(); + assert(lastStatus.createSocketCount === 1); + // make sure reuse the same socket + setImmediate(() => { + const req = http.get({ + agent, + port, + path: '/hang', + }, () => { + assert.fail('should not call this'); + }); + req.setTimeout(100, () => { + const status = agent.getCurrentStatus(); + assert(status.createSocketCount === 1); + assert(status.timeoutSocketCount - lastStatus.timeoutSocketCount === 1); + req.abort(); + done(); + }); + req.on('error', err => { + assert(err.message === 'socket hang up'); + done(); + }); + }); + }); + }); + }); + + it('should free socket timeout work', done => { const name = 'localhost:' + port + ':'; const agent = new Agent({ - keepAliveTimeout: 1000, + keepAliveTimeout: 100, }); - agent.on('timeout', done); let lastPort = null; http.get({ @@ -617,24 +980,21 @@ describe('test/agent.test.js', () => { process.nextTick(() => { assert(!agent.sockets[name]); assert(agent.freeSockets[name].length === 1); - // free socket timeout after 1s + // free socket timeout after 100ms setTimeout(() => { assert(!agent.freeSockets[name]); done(); - }, 1100); + }, 110); }); }); }); }); - it('should working socket timeout and emit agent timeout event', done => { - done = pedding(2, done); + it('should first use working socket timeout', done => { const name = 'localhost:' + port + ':'; const agent = new Agent({ - timeout: 1000, + timeout: 100, }); - agent.on('timeout', done); - http.get({ agent, port, @@ -642,19 +1002,49 @@ describe('test/agent.test.js', () => { }, () => { throw new Error('should not run this'); }).on('error', err => { - assert(err.message === 'socket hang up'); - assert(err.code === 'ECONNRESET'); + assert(err.message === 'Socket timeout'); + assert(err.code === 'ERR_SOCKET_TIMEOUT'); assert(!agent.sockets[name]); done(); }); assert(agent.sockets[name].length === 1); }); - it('should destroy free socket before timeout', done => { + it('should reuse working socket timeout', done => { const name = 'localhost:' + port + ':'; const agent = new Agent({ - keepAliveTimeout: 1000, + timeout: 100, }); + http.get({ + agent, + port, + path: '/', + }, res => { + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + setImmediate(() => { + http.get({ + agent, + port, + path: '/hang', + }, () => { + throw new Error('should not run this'); + }).on('error', err => { + assert(err.message === 'Socket timeout'); + assert(err.code === 'ERR_SOCKET_TIMEOUT'); + assert(!agent.sockets[name]); + done(); + }); + }); + }); + }); + assert(agent.sockets[name].length === 1); + }); + + it('should destroy free socket before timeout', done => { + const name = 'localhost:' + port + ':'; + const agent = new Agent(); let lastPort = null; http.get({ agent, @@ -673,9 +1063,21 @@ describe('test/agent.test.js', () => { assert(!agent.sockets[name]); assert(agent.freeSockets[name].length === 1); agent.freeSockets[name][0].destroy(); + assert(agent.createSocketCount === 1); setTimeout(() => { assert(!agent.freeSockets[name]); - done(); + // new request use the new socket + http.get({ + agent, + port, + path: '/', + }, res => { + assert(agent.sockets[name].length === 1); + assert(res.statusCode === 200); + assert(agent.createSocketCount === 2); + res.resume(); + res.on('end', done); + }); }, 10); }); }); @@ -687,7 +1089,6 @@ describe('test/agent.test.js', () => { done = pedding(2, done); const name = 'localhost:' + port + ':'; const agent = new Agent({ - keepAliveTimeout: 1000, maxSockets: 1, maxFreeSockets: 1, }); @@ -708,6 +1109,10 @@ describe('test/agent.test.js', () => { path: '/', }, res => { assert(agent.sockets[name].length === 1); + const socket = agent.sockets[name][0]; + assert(socket[SOCKET_REQUEST_COUNT] === 1); + // not finish + assert(socket[SOCKET_REQUEST_FINISHED_COUNT] === 0); assert(res.statusCode === 200); res.on('data', data => { data = JSON.parse(data); @@ -718,6 +1123,10 @@ describe('test/agent.test.js', () => { process.nextTick(() => { assert(!agent.sockets[name]); assert(agent.freeSockets[name].length === 1); + const socket = agent.freeSockets[name][0]; + assert(socket[SOCKET_REQUEST_COUNT] === 1); + // request finished + assert(socket[SOCKET_REQUEST_FINISHED_COUNT] === 1); done(); }); }); @@ -725,13 +1134,10 @@ describe('test/agent.test.js', () => { assert(agent.requests[name].length === 1); }); - it('should destroy all sockets', done => { + it('should destroy all sockets when freeSockets is empty', done => { done = pedding(2, done); const name = 'localhost:' + port + ':'; - const agent = new Agent({ - keepAliveTimeout: 1000, - }); - let lastPort = null; + const agent = new Agent(); http.get({ agent, port, @@ -751,31 +1157,64 @@ describe('test/agent.test.js', () => { }); assert(res.statusCode === 200); - res.on('data', data => { - data = JSON.parse(data); - lastPort = data.socket.port; - assert(lastPort > 0); - }); + res.resume(); res.on('end', () => { + assert(agent.sockets[name].length === 2); agent.destroy(); done(); }); }); }); + it('should destroy both sockets and freeSockets', done => { + done = pedding(2, done); + const name = 'localhost:' + port + ':'; + const agent = new Agent(); + http.get({ + agent, + port, + path: '/', + }, res => { + http.get({ + agent, + port, + path: '/', + }).on('error', err => { + assert(err.message === 'socket hang up'); + setTimeout(() => { + assert(!agent.sockets[name]); + assert(!agent.freeSockets[name]); + done(); + }, 10); + }); + + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + assert(agent.sockets[name].length === 2); + assert(!agent.freeSockets[name]); + setImmediate(() => { + assert(agent.sockets[name].length === 1); + assert(agent.freeSockets[name].length === 1); + agent.destroy(); + done(); + }); + }); + }); + }); + it('should keep max sockets: bugfix for orginal keepalive agent', _done => { + const name = 'localhost:' + port + ':'; const agentkeepalive = new Agent({ - keepAlive: true, - keepAliveMsecs: 1000, maxSockets: 2, maxFreeSockets: 2, }); const done = pedding(2, err => { assert(!err); - const pool = agentkeepalive.sockets[Object.keys(agentkeepalive.sockets)[0]]; + const pool = agentkeepalive.sockets[name]; assert(!pool); // all sockets on free list now - const freepool = agentkeepalive.freeSockets[Object.keys(agentkeepalive.freeSockets)[0]]; + const freepool = agentkeepalive.freeSockets[name]; assert(freepool.length === 2); _done(); }); @@ -788,9 +1227,8 @@ describe('test/agent.test.js', () => { assert(res.statusCode === 200); res.resume(); res.on('end', () => { - const pool = agentkeepalive.sockets[Object.keys(agentkeepalive.sockets)[0]]; - assert(pool); - setTimeout(done, 10); + assert(agentkeepalive.sockets[name]); + setImmediate(done); }); }); @@ -802,19 +1240,96 @@ describe('test/agent.test.js', () => { assert(res.statusCode === 200); res.resume(); res.on('end', () => { - const pool = agentkeepalive.sockets[Object.keys(agentkeepalive.sockets)[0]]; - assert(pool); - setTimeout(done, 10); + assert(agentkeepalive.sockets[name]); + setImmediate(done); }); }); }); + it('should make sure max sockets limit work', _done => { + const name = 'localhost:' + port + ':'; + const agentkeepalive = new Agent({ + maxSockets: 2, + maxFreeSockets: 2, + }); + const done = pedding(3, err => { + assert(!err); + const pool = agentkeepalive.sockets[name]; + assert(!pool); + // all sockets on free list now + const freepool = agentkeepalive.freeSockets[name]; + assert(freepool.length === 2); + // make sure all free sockets SOCKET_REQUEST_FINISHED_COUNT equal to SOCKET_REQUEST_COUNT + for (const s of freepool) { + assert(s[SOCKET_REQUEST_FINISHED_COUNT] === s[SOCKET_REQUEST_COUNT]); + } + _done(); + }); + + http.get({ + agent: agentkeepalive, + port, + path: '/', + }, res => { + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + assert(agentkeepalive.sockets[name]); + setImmediate(done); + }); + }); + + http.get({ + agent: agentkeepalive, + port, + path: '/', + }, res => { + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + assert(agentkeepalive.sockets[name]); + setImmediate(done); + }); + }); + + http.get({ + agent: agentkeepalive, + port, + path: '/', + }, res => { + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + assert(agentkeepalive.sockets[name]); + setImmediate(() => { + // reuse free socket on addRequest + assert(agentkeepalive.freeSockets[name]); + http.get({ + agent: agentkeepalive, + port, + path: '/', + }, res => { + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + assert(agentkeepalive.sockets[name]); + setImmediate(done); + }); + }); + }); + }); + }); + assert(agentkeepalive.sockets[name].length === 2); + assert(!agentkeepalive.freeSockets[name]); + }); + it('should timeout and remove free socket', done => { done = pedding(2, done); - const _keepaliveAgent = new Agent({ + const name = 'localhost:' + port + ':'; + const agent = new Agent({ maxSockets: 1, maxFreeSockets: 1, - keepAliveTimeout: 1000, + freeSocketTimeout: 1000, }); const options = { @@ -822,7 +1337,7 @@ describe('test/agent.test.js', () => { port: 80, path: '/', method: 'GET', - agent: _keepaliveAgent, + agent, }; let index = 0; @@ -847,7 +1362,11 @@ describe('test/agent.test.js', () => { req.on('socket', sock => { // Listen to timeout and send another request immediately. sock.on('timeout', () => { - console.log('socket:%s timeout', sock._host); + console.log('free socket:%s timeout', sock._host); + assert(!sock.writable); + // sock has been removed from freeSockets list + assert(!agent.freeSockets[name]); + console.log('new request send'); getRequest().end(); }); }); @@ -1093,18 +1612,16 @@ describe('test/agent.test.js', () => { }); describe('options.socketActiveTTL', () => { - it('should expire active socket when it is out of ttl', done => { - const name = 'localhost:' + port + ':'; + it('should expire on free socket timeout when it is out of ttl', done => { const agent = new Agent({ keepAlive: true, - keepAliveMsecs: 1000, maxSockets: 5, maxFreeSockets: 5, timeout: 30000, freeSocketKeepAliveTimeout: 5000, - socketActiveTTL: 500, + socketActiveTTL: 100, }); - http.get({ + const req1 = http.get({ agent, port, path: '/', @@ -1112,9 +1629,11 @@ describe('test/agent.test.js', () => { assert(res.statusCode === 200); res.resume(); res.on('end', () => { - const firstCreatedTime = agent.sockets[name].pop().createdTime; - setTimeout(function() { - http.get({ + const socket1 = req1.socket; + const firstCreatedTime = socket1[SOCKET_CREATED_TIME]; + assert(firstCreatedTime && typeof firstCreatedTime === 'number'); + setTimeout(() => { + const req2 = http.get({ agent, port, path: '/', @@ -1122,39 +1641,72 @@ describe('test/agent.test.js', () => { assert(res.statusCode === 200); res.resume(); res.on('end', () => { - const currentCreatedTime = agent.sockets[name].pop().createdTime; + assert(req2.socket !== socket1); + const currentCreatedTime = req2.socket[SOCKET_CREATED_TIME]; + assert(currentCreatedTime && typeof currentCreatedTime === 'number'); assert(firstCreatedTime < currentCreatedTime); done(); }); }); - }, 600); + }, 200); }); }); + }); + it('should expire on socket reuse detect when it is out of ttl', done => { + const agent = new Agent({ + keepAlive: true, + socketActiveTTL: 10, + }); + const req1 = http.get({ + agent, + port, + path: '/?timeout=20', + }, res => { + const socket1 = req1.socket; + const firstCreatedTime = socket1[SOCKET_CREATED_TIME]; + assert(firstCreatedTime && typeof firstCreatedTime === 'number'); + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + setImmediate(() => { + const req2 = http.get({ + agent, + port, + path: '/', + }, res => { + // not the same socket + assert(req2.socket[SOCKET_NAME] !== socket1[SOCKET_NAME]); + const currentCreatedTime = req2.socket[SOCKET_CREATED_TIME]; + assert(currentCreatedTime && typeof currentCreatedTime === 'number'); + assert(firstCreatedTime < currentCreatedTime); + assert(res.statusCode === 200); + res.resume(); + res.on('end', done); + }); + }); + }); + }); }); it('should not expire active socket when it is in ttl', done => { - const name = 'localhost:' + port + ':'; const agent = new Agent({ - keepAlive: true, - keepAliveMsecs: 1000, - maxSockets: 5, - maxFreeSockets: 5, - timeout: 30000, - freeSocketKeepAliveTimeout: 5000, socketActiveTTL: 1000, }); - http.get({ + const req1 = http.get({ agent, port, path: '/', }, res => { + const socket1 = req1.socket; + const firstCreatedTime = socket1[SOCKET_CREATED_TIME]; + assert(firstCreatedTime && typeof firstCreatedTime === 'number'); assert(res.statusCode === 200); res.resume(); res.on('end', () => { - const firstCreatedTime = agent.sockets[name].pop().createdTime; setTimeout(function() { - http.get({ + assert(socket1.timeout <= 1000); + const req2 = http.get({ agent, port, path: '/', @@ -1162,15 +1714,54 @@ describe('test/agent.test.js', () => { assert(res.statusCode === 200); res.resume(); res.on('end', () => { - const currentCreatedTime = agent.sockets[name].pop().createdTime; + assert(req2.socket[SOCKET_NAME] === socket1[SOCKET_NAME]); + const currentCreatedTime = req2.socket[SOCKET_CREATED_TIME]; + assert(currentCreatedTime && typeof currentCreatedTime === 'number'); assert(firstCreatedTime === currentCreatedTime); done(); }); }); - }, 600); + }, 100); }); }); + }); + it('should TTL diff > freeSocketTimeout', done => { + const agent = new Agent({ + freeSocketTimeout: 500, + socketActiveTTL: 1000, + }); + const req1 = http.get({ + agent, + port, + path: '/', + }, res => { + const socket1 = req1.socket; + const firstCreatedTime = socket1[SOCKET_CREATED_TIME]; + assert(firstCreatedTime && typeof firstCreatedTime === 'number'); + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + setTimeout(function() { + assert(socket1.timeout === 500); + const req2 = http.get({ + agent, + port, + path: '/', + }, res => { + assert(res.statusCode === 200); + res.resume(); + res.on('end', () => { + assert(req2.socket[SOCKET_NAME] === socket1[SOCKET_NAME]); + const currentCreatedTime = req2.socket[SOCKET_CREATED_TIME]; + assert(currentCreatedTime && typeof currentCreatedTime === 'number'); + assert(firstCreatedTime === currentCreatedTime); + done(); + }); + }); + }, 100); + }); + }); }); }); }); diff --git a/test/https_agent.test.js b/test/https_agent.test.js index 25a80c5..3c03467 100644 --- a/test/https_agent.test.js +++ b/test/https_agent.test.js @@ -10,7 +10,7 @@ describe('test/https_agent.test.js', () => { let app = null; let port = null; const agentkeepalive = new HttpsAgent({ - keepAliveTimeout: 1000, + freeSocketTimeout: 1000, timeout: 2000, maxSockets: 5, maxFreeSockets: 5, @@ -24,6 +24,7 @@ describe('test/https_agent.test.js', () => { res.destroy(); return; } else if (req.url === '/hang') { + console.log('[new https request] %s %s', req.method, req.url); // Wait forever. return; } @@ -61,7 +62,9 @@ describe('test/https_agent.test.js', () => { assert(res.statusCode === 200); res.resume(); res.on('end', () => { - process.nextTick(() => { + assert(Object.keys(agentkeepalive.sockets).length === 1); + assert(Object.keys(agentkeepalive.freeSockets).length === 0); + setImmediate(() => { assert(Object.keys(agentkeepalive.sockets).length === 0); assert(Object.keys(agentkeepalive.freeSockets).length === 1); done(); @@ -69,6 +72,7 @@ describe('test/https_agent.test.js', () => { }); }); assert(Object.keys(agentkeepalive.sockets).length === 1); + assert(Object.keys(agentkeepalive.freeSockets).length === 0); }); it('should free socket timeout', done => { diff --git a/test/test-http-agent-maxsockets-regress-4050.test.js b/test/test-http-agent-maxsockets-regress-4050.test.js index d500b97..cd9cd30 100644 --- a/test/test-http-agent-maxsockets-regress-4050.test.js +++ b/test/test-http-agent-maxsockets-regress-4050.test.js @@ -6,7 +6,7 @@ const assert = require('assert'); const http = require('http'); -describe('test/test-http-agent-maxsockets-regress-4050.test', () => { +describe('test/test-http-agent-maxsockets-regress-4050.test.js', () => { it('should keep active sockets <= MAX_SOCKETS when all requests abort', done => { const MAX_SOCKETS = 2;