diff --git a/doc/api/http.markdown b/doc/api/http.markdown index e6d53cb3edc212..ed2e4cfb118ed5 100644 --- a/doc/api/http.markdown +++ b/doc/api/http.markdown @@ -117,6 +117,18 @@ options.agent = keepAliveAgent; http.request(options, onResponseCallback); ``` +### agent.createConnection(options[, callback]) + +Produces a socket/stream to be used for HTTP requests. + +By default, this function is the same as [`net.createConnection()`][]. However, +custom Agents may override this method in case greater flexibility is desired. + +A socket/stream can be supplied in one of two ways: by returning the +socket/stream from this function, or by passing the socket/stream to `callback`. + +`callback` has a signature of `(err, stream)`. + ### agent.destroy() Destroy any sockets that are currently in use by the agent. @@ -1112,6 +1124,10 @@ Options: - `Agent` object: explicitly use the passed in `Agent`. - `false`: opts out of connection pooling with an Agent, defaults request to `Connection: close`. +- `createConnection`: A function that produces a socket/stream to use for the + request when the `agent` option is not used. This can be used to avoid + creating a custom Agent class just to override the default `createConnection` + function. See [`agent.createConnection()`][] for more details. The optional `callback` parameter will be added as a one time listener for the `'response'` event. @@ -1187,6 +1203,7 @@ There are a few special headers that should be noted. [`'listening'`]: net.html#net_event_listening [`'response'`]: #http_event_response [`Agent`]: #http_class_http_agent +[`agent.createConnection`]: #http_agent_createconnection [`Buffer`]: buffer.html#buffer_buffer [`destroy()`]: #http_agent_destroy [`EventEmitter`]: events.html#events_class_events_eventemitter @@ -1198,6 +1215,7 @@ There are a few special headers that should be noted. [`http.Server`]: #http_class_http_server [`http.ServerResponse`]: #http_class_http_serverresponse [`message.headers`]: #http_message_headers +[`net.createConnection`]: net.html#net_net_createconnection_options_connectlistener [`net.Server`]: net.html#net_class_net_server [`net.Server.close()`]: net.html#net_server_close_callback [`net.Server.listen()`]: net.html#net_server_listen_handle_callback diff --git a/lib/_http_agent.js b/lib/_http_agent.js index ddb1c5bfff9b63..5828927786288f 100644 --- a/lib/_http_agent.js +++ b/lib/_http_agent.js @@ -44,7 +44,7 @@ function Agent(options) { var name = self.getName(options); debug('agent.on(free)', name); - if (!socket.destroyed && + if (socket.writable && self.requests[name] && self.requests[name].length) { self.requests[name].shift().onSocket(socket); if (self.requests[name].length === 0) { @@ -57,7 +57,7 @@ function Agent(options) { var req = socket._httpMessage; if (req && req.shouldKeepAlive && - !socket.destroyed && + socket.writable && self.keepAlive) { var freeSockets = self.freeSockets[name]; var freeLen = freeSockets ? freeSockets.length : 0; @@ -138,7 +138,15 @@ Agent.prototype.addRequest = function(req, options) { } else if (sockLen < this.maxSockets) { debug('call onSocket', sockLen, freeLen); // If we are under maxSockets create a new one. - req.onSocket(this.createSocket(req, options)); + this.createSocket(req, options, function(err, newSocket) { + if (err) { + process.nextTick(function() { + req.emit('error', err); + }); + return; + } + req.onSocket(newSocket); + }); } else { debug('wait for socket'); // We are over limit so we'll add it to the queue. @@ -149,18 +157,16 @@ Agent.prototype.addRequest = function(req, options) { } }; -Agent.prototype.createSocket = function(req, options) { +Agent.prototype.createSocket = function(req, options, cb) { var self = this; options = util._extend({}, options); options = util._extend(options, self.options); if (!options.servername) { options.servername = options.host; - if (req) { - var hostHeader = req.getHeader('host'); - if (hostHeader) { - options.servername = hostHeader.replace(/:.*$/, ''); - } + const hostHeader = req.getHeader('host'); + if (hostHeader) { + options.servername = hostHeader.replace(/:.*$/, ''); } } @@ -169,48 +175,58 @@ Agent.prototype.createSocket = function(req, options) { debug('createConnection', name, options); options.encoding = null; - var s = self.createConnection(options); - if (!self.sockets[name]) { - self.sockets[name] = []; - } - this.sockets[name].push(s); - debug('sockets', name, this.sockets[name].length); + var called = false; + const newSocket = self.createConnection(options, oncreate); + if (newSocket) + oncreate(null, newSocket); + function oncreate(err, s) { + if (called) + return; + called = true; + if (err) + return cb(err); + if (!self.sockets[name]) { + self.sockets[name] = []; + } + self.sockets[name].push(s); + debug('sockets', name, self.sockets[name].length); - function onFree() { - self.emit('free', s, options); - } - s.on('free', onFree); - - function onClose(err) { - debug('CLIENT socket onClose'); - // This is the only place where sockets get removed from the Agent. - // If you want to remove a socket from the pool, just close it. - // All socket errors end in a close event anyway. - self.removeSocket(s, options); - } - s.on('close', onClose); - - function onRemove() { - // We need this function for cases like HTTP 'upgrade' - // (defined by WebSockets) where we need to remove a socket from the - // pool because it'll be locked up indefinitely - debug('CLIENT socket onRemove'); - self.removeSocket(s, options); - s.removeListener('close', onClose); - s.removeListener('free', onFree); - s.removeListener('agentRemove', onRemove); + function onFree() { + self.emit('free', s, options); + } + s.on('free', onFree); + + function onClose(err) { + debug('CLIENT socket onClose'); + // This is the only place where sockets get removed from the Agent. + // If you want to remove a socket from the pool, just close it. + // All socket errors end in a close event anyway. + self.removeSocket(s, options); + } + s.on('close', onClose); + + function onRemove() { + // We need this function for cases like HTTP 'upgrade' + // (defined by WebSockets) where we need to remove a socket from the + // pool because it'll be locked up indefinitely + debug('CLIENT socket onRemove'); + self.removeSocket(s, options); + s.removeListener('close', onClose); + s.removeListener('free', onFree); + s.removeListener('agentRemove', onRemove); + } + s.on('agentRemove', onRemove); + cb(null, s); } - s.on('agentRemove', onRemove); - return s; }; Agent.prototype.removeSocket = function(s, options) { var name = this.getName(options); - debug('removeSocket', name, 'destroyed:', s.destroyed); + debug('removeSocket', name, 'writable:', s.writable); var sets = [this.sockets]; // If the socket was destroyed, remove it from the free buffers too. - if (s.destroyed) + if (!s.writable) sets.push(this.freeSockets); for (var sk = 0; sk < sets.length; sk++) { @@ -231,7 +247,15 @@ Agent.prototype.removeSocket = function(s, options) { debug('removeSocket, have a request, make a socket'); var req = this.requests[name][0]; // If we have pending requests and a socket gets closed make a new one - this.createSocket(req, options).emit('free'); + this.createSocket(req, options, function(err, newSocket) { + if (err) { + process.nextTick(function() { + req.emit('error', err); + }); + return; + } + newSocket.emit('free'); + }); } }; diff --git a/lib/_http_client.js b/lib/_http_client.js index 2253b47c91288f..79074ef2113413 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -33,7 +33,7 @@ function ClientRequest(options, cb) { if (agent === false) { agent = new defaultAgent.constructor(); } else if ((agent === null || agent === undefined) && - !options.createConnection) { + typeof options.createConnection !== 'function') { agent = defaultAgent; } self.agent = agent; @@ -118,10 +118,20 @@ function ClientRequest(options, cb) { self._renderHeaders()); } + var called = false; if (self.socketPath) { self._last = true; self.shouldKeepAlive = false; - self.onSocket(self.agent.createConnection({ path: self.socketPath })); + const optionsPath = { + path: self.socketPath + }; + const newSocket = self.agent.createConnection(optionsPath, oncreate); + if (newSocket && !called) { + called = true; + self.onSocket(newSocket); + } else { + return; + } } else if (self.agent) { // If there is an agent we should default to Connection:keep-alive, // but only if the Agent will actually reuse the connection! @@ -139,14 +149,37 @@ function ClientRequest(options, cb) { // No agent, default to Connection:close. self._last = true; self.shouldKeepAlive = false; - if (options.createConnection) { - self.onSocket(options.createConnection(options)); + if (typeof options.createConnection === 'function') { + const newSocket = options.createConnection(options, oncreate); + if (newSocket && !called) { + called = true; + self.onSocket(newSocket); + } else { + return; + } } else { debug('CLIENT use net.createConnection', options); self.onSocket(net.createConnection(options)); } } + function oncreate(err, socket) { + if (called) + return; + called = true; + if (err) { + process.nextTick(function() { + self.emit('error', err); + }); + return; + } + self.onSocket(socket); + self._deferToConnect(null, null, function() { + self._flush(); + self = null; + }); + } + self._deferToConnect(null, null, function() { self._flush(); self = null; diff --git a/test/parallel/test-http-createConnection.js b/test/parallel/test-http-createConnection.js index 48a7d7dbe68ea3..1b7376d1287dd6 100644 --- a/test/parallel/test-http-createConnection.js +++ b/test/parallel/test-http-createConnection.js @@ -1,27 +1,61 @@ 'use strict'; -var common = require('../common'); -var assert = require('assert'); -var http = require('http'); -var net = require('net'); +const common = require('../common'); +const http = require('http'); +const net = require('net'); +const assert = require('assert'); -var create = 0; -var response = 0; -process.on('exit', function() { - assert.equal(1, create, 'createConnection() http option was not called'); - assert.equal(1, response, 'http server "request" callback was not called'); -}); - -var server = http.createServer(function(req, res) { +const server = http.createServer(common.mustCall(function(req, res) { res.end(); - response++; -}).listen(common.PORT, '127.0.0.1', function() { - http.get({ createConnection: createConnection }, function(res) { +}, 4)).listen(common.PORT, '127.0.0.1', function() { + let fn = common.mustCall(createConnection); + http.get({ createConnection: fn }, function(res) { res.resume(); - server.close(); + fn = common.mustCall(createConnectionAsync); + http.get({ createConnection: fn }, function(res) { + res.resume(); + fn = common.mustCall(createConnectionBoth1); + http.get({ createConnection: fn }, function(res) { + res.resume(); + fn = common.mustCall(createConnectionBoth2); + http.get({ createConnection: fn }, function(res) { + res.resume(); + fn = common.mustCall(createConnectionError); + http.get({ createConnection: fn }, function(res) { + assert.fail(null, null, 'Unexpected response callback'); + }).on('error', common.mustCall(function(err) { + assert.equal(err.message, 'Could not create socket'); + server.close(); + })); + }); + }); + }); }); }); function createConnection() { - create++; return net.createConnection(common.PORT, '127.0.0.1'); } + +function createConnectionAsync(options, cb) { + setImmediate(function() { + cb(null, net.createConnection(common.PORT, '127.0.0.1')); + }); +} + +function createConnectionBoth1(options, cb) { + const socket = net.createConnection(common.PORT, '127.0.0.1'); + setImmediate(function() { + cb(null, socket); + }); + return socket; +} + +function createConnectionBoth2(options, cb) { + const socket = net.createConnection(common.PORT, '127.0.0.1'); + cb(null, socket); + return socket; +} + +function createConnectionError(options, cb) { + process.nextTick(cb, new Error('Could not create socket')); +}