Skip to content

Commit

Permalink
http: allow async createConnection()
Browse files Browse the repository at this point in the history
This commit adds support for async createConnection()
implementations and is still backwards compatible with
synchronous createConnection() implementations.

This commit also makes the http client more friendly with
generic stream objects produced by createConnection() by
checking stream.writable instead of stream.destroyed as the
latter is currently a net.Socket-ism and not set by the core
stream implementations.

PR-URL: nodejs#4638
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
mscdex authored and stefanmb committed Feb 23, 2016
1 parent 6438449 commit 1d29a46
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 64 deletions.
18 changes: 18 additions & 0 deletions doc/api/http.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ options.agent = keepAliveAgent;
http.request(options, onResponseCallback);
```

### agent.createConnection(options[, callback])

Produces a socket/stream to be used for HTTP requests.

By default, this function is the same as [`net.createConnection()`][]. However,
custom Agents may override this method in case greater flexibility is desired.

A socket/stream can be supplied in one of two ways: by returning the
socket/stream from this function, or by passing the socket/stream to `callback`.

`callback` has a signature of `(err, stream)`.

### agent.destroy()

Destroy any sockets that are currently in use by the agent.
Expand Down Expand Up @@ -1117,6 +1129,10 @@ Options:
- `Agent` object: explicitly use the passed in `Agent`.
- `false`: opts out of connection pooling with an Agent, defaults request to
`Connection: close`.
- `createConnection`: A function that produces a socket/stream to use for the
request when the `agent` option is not used. This can be used to avoid
creating a custom Agent class just to override the default `createConnection`
function. See [`agent.createConnection()`][] for more details.

The optional `callback` parameter will be added as a one time listener for
the `'response'` event.
Expand Down Expand Up @@ -1192,6 +1208,7 @@ There are a few special headers that should be noted.
[`'listening'`]: net.html#net_event_listening
[`'response'`]: #http_event_response
[`Agent`]: #http_class_http_agent
[`agent.createConnection`]: #http_agent_createconnection
[`Buffer`]: buffer.html#buffer_buffer
[`destroy()`]: #http_agent_destroy
[`EventEmitter`]: events.html#events_class_events_eventemitter
Expand All @@ -1203,6 +1220,7 @@ There are a few special headers that should be noted.
[`http.Server`]: #http_class_http_server
[`http.ServerResponse`]: #http_class_http_serverresponse
[`message.headers`]: #http_message_headers
[`net.createConnection`]: net.html#net_net_createconnection_options_connectlistener
[`net.Server`]: net.html#net_class_net_server
[`net.Server.close()`]: net.html#net_server_close_callback
[`net.Server.listen()`]: net.html#net_server_listen_handle_callback
Expand Down
110 changes: 67 additions & 43 deletions lib/_http_agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ function Agent(options) {
var name = self.getName(options);
debug('agent.on(free)', name);

if (!socket.destroyed &&
if (socket.writable &&
self.requests[name] && self.requests[name].length) {
self.requests[name].shift().onSocket(socket);
if (self.requests[name].length === 0) {
Expand All @@ -57,7 +57,7 @@ function Agent(options) {
var req = socket._httpMessage;
if (req &&
req.shouldKeepAlive &&
!socket.destroyed &&
socket.writable &&
self.keepAlive) {
var freeSockets = self.freeSockets[name];
var freeLen = freeSockets ? freeSockets.length : 0;
Expand Down Expand Up @@ -138,7 +138,15 @@ Agent.prototype.addRequest = function(req, options) {
} else if (sockLen < this.maxSockets) {
debug('call onSocket', sockLen, freeLen);
// If we are under maxSockets create a new one.
req.onSocket(this.createSocket(req, options));
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
req.emit('error', err);
});
return;
}
req.onSocket(newSocket);
});
} else {
debug('wait for socket');
// We are over limit so we'll add it to the queue.
Expand All @@ -149,18 +157,16 @@ Agent.prototype.addRequest = function(req, options) {
}
};

Agent.prototype.createSocket = function(req, options) {
Agent.prototype.createSocket = function(req, options, cb) {
var self = this;
options = util._extend({}, options);
options = util._extend(options, self.options);

if (!options.servername) {
options.servername = options.host;
if (req) {
var hostHeader = req.getHeader('host');
if (hostHeader) {
options.servername = hostHeader.replace(/:.*$/, '');
}
const hostHeader = req.getHeader('host');
if (hostHeader) {
options.servername = hostHeader.replace(/:.*$/, '');
}
}

Expand All @@ -169,48 +175,58 @@ Agent.prototype.createSocket = function(req, options) {

debug('createConnection', name, options);
options.encoding = null;
var s = self.createConnection(options);
if (!self.sockets[name]) {
self.sockets[name] = [];
}
this.sockets[name].push(s);
debug('sockets', name, this.sockets[name].length);
var called = false;
const newSocket = self.createConnection(options, oncreate);
if (newSocket)
oncreate(null, newSocket);
function oncreate(err, s) {
if (called)
return;
called = true;
if (err)
return cb(err);
if (!self.sockets[name]) {
self.sockets[name] = [];
}
self.sockets[name].push(s);
debug('sockets', name, self.sockets[name].length);

function onFree() {
self.emit('free', s, options);
}
s.on('free', onFree);

function onClose(err) {
debug('CLIENT socket onClose');
// This is the only place where sockets get removed from the Agent.
// If you want to remove a socket from the pool, just close it.
// All socket errors end in a close event anyway.
self.removeSocket(s, options);
}
s.on('close', onClose);

function onRemove() {
// We need this function for cases like HTTP 'upgrade'
// (defined by WebSockets) where we need to remove a socket from the
// pool because it'll be locked up indefinitely
debug('CLIENT socket onRemove');
self.removeSocket(s, options);
s.removeListener('close', onClose);
s.removeListener('free', onFree);
s.removeListener('agentRemove', onRemove);
function onFree() {
self.emit('free', s, options);
}
s.on('free', onFree);

function onClose(err) {
debug('CLIENT socket onClose');
// This is the only place where sockets get removed from the Agent.
// If you want to remove a socket from the pool, just close it.
// All socket errors end in a close event anyway.
self.removeSocket(s, options);
}
s.on('close', onClose);

function onRemove() {
// We need this function for cases like HTTP 'upgrade'
// (defined by WebSockets) where we need to remove a socket from the
// pool because it'll be locked up indefinitely
debug('CLIENT socket onRemove');
self.removeSocket(s, options);
s.removeListener('close', onClose);
s.removeListener('free', onFree);
s.removeListener('agentRemove', onRemove);
}
s.on('agentRemove', onRemove);
cb(null, s);
}
s.on('agentRemove', onRemove);
return s;
};

Agent.prototype.removeSocket = function(s, options) {
var name = this.getName(options);
debug('removeSocket', name, 'destroyed:', s.destroyed);
debug('removeSocket', name, 'writable:', s.writable);
var sets = [this.sockets];

// If the socket was destroyed, remove it from the free buffers too.
if (s.destroyed)
if (!s.writable)
sets.push(this.freeSockets);

for (var sk = 0; sk < sets.length; sk++) {
Expand All @@ -231,7 +247,15 @@ Agent.prototype.removeSocket = function(s, options) {
debug('removeSocket, have a request, make a socket');
var req = this.requests[name][0];
// If we have pending requests and a socket gets closed make a new one
this.createSocket(req, options).emit('free');
this.createSocket(req, options, function(err, newSocket) {
if (err) {
process.nextTick(function() {
req.emit('error', err);
});
return;
}
newSocket.emit('free');
});
}
};

Expand Down
41 changes: 37 additions & 4 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function ClientRequest(options, cb) {
if (agent === false) {
agent = new defaultAgent.constructor();
} else if ((agent === null || agent === undefined) &&
!options.createConnection) {
typeof options.createConnection !== 'function') {
agent = defaultAgent;
}
self.agent = agent;
Expand Down Expand Up @@ -118,10 +118,20 @@ function ClientRequest(options, cb) {
self._renderHeaders());
}

var called = false;
if (self.socketPath) {
self._last = true;
self.shouldKeepAlive = false;
self.onSocket(self.agent.createConnection({ path: self.socketPath }));
const optionsPath = {
path: self.socketPath
};
const newSocket = self.agent.createConnection(optionsPath, oncreate);
if (newSocket && !called) {
called = true;
self.onSocket(newSocket);
} else {
return;
}
} else if (self.agent) {
// If there is an agent we should default to Connection:keep-alive,
// but only if the Agent will actually reuse the connection!
Expand All @@ -139,14 +149,37 @@ function ClientRequest(options, cb) {
// No agent, default to Connection:close.
self._last = true;
self.shouldKeepAlive = false;
if (options.createConnection) {
self.onSocket(options.createConnection(options));
if (typeof options.createConnection === 'function') {
const newSocket = options.createConnection(options, oncreate);
if (newSocket && !called) {
called = true;
self.onSocket(newSocket);
} else {
return;
}
} else {
debug('CLIENT use net.createConnection', options);
self.onSocket(net.createConnection(options));
}
}

function oncreate(err, socket) {
if (called)
return;
called = true;
if (err) {
process.nextTick(function() {
self.emit('error', err);
});
return;
}
self.onSocket(socket);
self._deferToConnect(null, null, function() {
self._flush();
self = null;
});
}

self._deferToConnect(null, null, function() {
self._flush();
self = null;
Expand Down
68 changes: 51 additions & 17 deletions test/parallel/test-http-createConnection.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,61 @@
'use strict';
var common = require('../common');
var assert = require('assert');
var http = require('http');
var net = require('net');
const common = require('../common');
const http = require('http');
const net = require('net');
const assert = require('assert');

var create = 0;
var response = 0;
process.on('exit', function() {
assert.equal(1, create, 'createConnection() http option was not called');
assert.equal(1, response, 'http server "request" callback was not called');
});

var server = http.createServer(function(req, res) {
const server = http.createServer(common.mustCall(function(req, res) {
res.end();
response++;
}).listen(common.PORT, '127.0.0.1', function() {
http.get({ createConnection: createConnection }, function(res) {
}, 4)).listen(common.PORT, '127.0.0.1', function() {
let fn = common.mustCall(createConnection);
http.get({ createConnection: fn }, function(res) {
res.resume();
server.close();
fn = common.mustCall(createConnectionAsync);
http.get({ createConnection: fn }, function(res) {
res.resume();
fn = common.mustCall(createConnectionBoth1);
http.get({ createConnection: fn }, function(res) {
res.resume();
fn = common.mustCall(createConnectionBoth2);
http.get({ createConnection: fn }, function(res) {
res.resume();
fn = common.mustCall(createConnectionError);
http.get({ createConnection: fn }, function(res) {
assert.fail(null, null, 'Unexpected response callback');
}).on('error', common.mustCall(function(err) {
assert.equal(err.message, 'Could not create socket');
server.close();
}));
});
});
});
});
});

function createConnection() {
create++;
return net.createConnection(common.PORT, '127.0.0.1');
}

function createConnectionAsync(options, cb) {
setImmediate(function() {
cb(null, net.createConnection(common.PORT, '127.0.0.1'));
});
}

function createConnectionBoth1(options, cb) {
const socket = net.createConnection(common.PORT, '127.0.0.1');
setImmediate(function() {
cb(null, socket);
});
return socket;
}

function createConnectionBoth2(options, cb) {
const socket = net.createConnection(common.PORT, '127.0.0.1');
cb(null, socket);
return socket;
}

function createConnectionError(options, cb) {
process.nextTick(cb, new Error('Could not create socket'));
}

0 comments on commit 1d29a46

Please sign in to comment.