Skip to content

Commit

Permalink
src: do not reuse async resource in http parsers
Browse files Browse the repository at this point in the history
Change resource being used, previously HTTParser was being reused.
We are now using IncomingMessage and ClientRequest objects.  The goal
here is to make the async resource unique for each async operatio

Refs: #24330
Refs: nodejs/diagnostics#248
Refs: #21313

Co-authored-by: Matheus Marchini <mat@mmarchini.me>

PR-URL: #25094
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benedikt Meurer <benedikt.meurer@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
Drieger authored and mmarchini committed Apr 22, 2019
1 parent 5aaf666 commit ece5073
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 32 deletions.
8 changes: 4 additions & 4 deletions doc/api/async_hooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,10 @@ The `type` is a string identifying the type of resource that caused
resource's constructor.

```text
FSEVENTWRAP, FSREQCALLBACK, GETADDRINFOREQWRAP, GETNAMEINFOREQWRAP, HTTPPARSER,
JSSTREAM, PIPECONNECTWRAP, PIPEWRAP, PROCESSWRAP, QUERYWRAP, SHUTDOWNWRAP,
SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVERWRAP, TCPWRAP, TTYWRAP,
UDPSENDWRAP, UDPWRAP, WRITEWRAP, ZLIB, SSLCONNECTION, PBKDF2REQUEST,
FSEVENTWRAP, FSREQCALLBACK, GETADDRINFOREQWRAP, GETNAMEINFOREQWRAP, HTTPINCOMINGMESSAGE,
HTTPCLIENTREQUEST, JSSTREAM, PIPECONNECTWRAP, PIPEWRAP, PROCESSWRAP, QUERYWRAP,
SHUTDOWNWRAP, SIGNALWRAP, STATWATCHER, TCPCONNECTWRAP, TCPSERVERWRAP, TCPWRAP,
TTYWRAP, UDPSENDWRAP, UDPWRAP, WRITEWRAP, ZLIB, SSLCONNECTION, PBKDF2REQUEST,
RANDOMBYTESREQUEST, TLSWRAP, Microtask, Timeout, Immediate, TickObject
```

Expand Down
3 changes: 1 addition & 2 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -632,11 +632,10 @@ function emitFreeNT(socket) {
}

function tickOnSocket(req, socket) {
const isParserReused = parsers.hasItems();
const parser = parsers.alloc();
req.socket = socket;
req.connection = socket;
parser.reinitialize(HTTPParser.RESPONSE, isParserReused);
parser.initialize(HTTPParser.RESPONSE, req);
parser.socket = socket;
parser.outgoing = req;
req.parser = parser;
Expand Down
16 changes: 14 additions & 2 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ const STATUS_CODES = {

const kOnExecute = HTTPParser.kOnExecute | 0;

class HTTPServerAsyncResource {
constructor(type, socket) {
this.type = type;
this.socket = socket;
}
}

function ServerResponse(req) {
OutgoingMessage.call(this);
Expand Down Expand Up @@ -349,9 +355,15 @@ function connectionListenerInternal(server, socket) {
socket.setTimeout(server.timeout);
socket.on('timeout', socketOnTimeout);

const isParserReused = parsers.hasItems();
const parser = parsers.alloc();
parser.reinitialize(HTTPParser.REQUEST, isParserReused);

// TODO(addaleax): This doesn't play well with the
// `async_hooks.currentResource()` proposal, see
// https://github.com/nodejs/node/pull/21313
parser.initialize(
HTTPParser.REQUEST,
new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket)
);
parser.socket = socket;

// We are starting to wait for our headers.
Expand Down
5 changes: 5 additions & 0 deletions src/async_wrap-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ inline AsyncWrap::ProviderType AsyncWrap::provider_type() const {
return provider_type_;
}

inline AsyncWrap::ProviderType AsyncWrap::set_provider_type(
AsyncWrap::ProviderType provider) {
provider_type_ = provider;
return provider_type_;
}

inline double AsyncWrap::get_async_id() const {
return async_id_;
Expand Down
8 changes: 6 additions & 2 deletions src/async_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,15 @@ void AsyncWrap::EmitDestroy(Environment* env, double async_id) {
env->destroy_async_id_list()->push_back(async_id);
}

void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
AsyncReset(object(), execution_async_id, silent);
}

// Generalized call for both the constructor and for handles that are pooled
// and reused over their lifetime. This way a new uid can be assigned when
// the resource is pulled out of the pool and put back into use.
void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {
void AsyncWrap::AsyncReset(Local<Object> resource, double execution_async_id,
bool silent) {
if (async_id_ != -1) {
// This instance was in use before, we have already emitted an init with
// its previous async_id and need to emit a matching destroy for that
Expand Down Expand Up @@ -643,7 +647,7 @@ void AsyncWrap::AsyncReset(double execution_async_id, bool silent) {

if (silent) return;

EmitAsyncInit(env(), object(),
EmitAsyncInit(env(), resource,
env()->async_hooks()->provider_string(provider_type()),
async_id_, trigger_async_id_);
}
Expand Down
10 changes: 8 additions & 2 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ namespace node {
V(HTTP2STREAM) \
V(HTTP2PING) \
V(HTTP2SETTINGS) \
V(HTTPPARSER) \
V(HTTPINCOMINGMESSAGE) \
V(HTTPCLIENTREQUEST) \
V(JSSTREAM) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
Expand Down Expand Up @@ -147,11 +148,16 @@ class AsyncWrap : public BaseObject {
static void DestroyAsyncIdsCallback(Environment* env, void* data);

inline ProviderType provider_type() const;
inline ProviderType set_provider_type(ProviderType provider);

inline double get_async_id() const;

inline double get_trigger_async_id() const;

void AsyncReset(v8::Local<v8::Object> resource,
double execution_async_id = -1,
bool silent = false);

void AsyncReset(double execution_async_id = -1, bool silent = false);

// Only call these within a valid HandleScope.
Expand Down Expand Up @@ -202,7 +208,7 @@ class AsyncWrap : public BaseObject {
ProviderType provider,
double execution_async_id,
bool silent);
const ProviderType provider_type_;
ProviderType provider_type_;
// Because the values may be Reset(), cannot be made const.
double async_id_ = -1;
double trigger_async_id_;
Expand Down
28 changes: 15 additions & 13 deletions src/node_http_parser_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,13 @@ struct StringPtr {
size_t size_;
};


class Parser : public AsyncWrap, public StreamListener {
public:
Parser(Environment* env, Local<Object> wrap, parser_type_t type)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_HTTPPARSER),
: AsyncWrap(env, wrap,
type == HTTP_REQUEST ?
AsyncWrap::PROVIDER_HTTPINCOMINGMESSAGE :
AsyncWrap::PROVIDER_HTTPCLIENTREQUEST),
current_buffer_len_(0),
current_buffer_data_(nullptr) {
Init(type);
Expand Down Expand Up @@ -503,12 +505,12 @@ class Parser : public AsyncWrap, public StreamListener {
}


static void Reinitialize(const FunctionCallbackInfo<Value>& args) {
static void Initialize(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);

CHECK(args[0]->IsInt32());
CHECK(args[1]->IsBoolean());
bool isReused = args[1]->IsTrue();
CHECK(args[1]->IsObject());

parser_type_t type =
static_cast<parser_type_t>(args[0].As<Int32>()->Value());

Expand All @@ -517,16 +519,16 @@ class Parser : public AsyncWrap, public StreamListener {
ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
// Should always be called from the same context.
CHECK_EQ(env, parser->env());
// This parser has either just been created or it is being reused.
// We must only call AsyncReset for the latter case, because AsyncReset has
// already been called via the constructor for the former case.
if (isReused) {
parser->AsyncReset();
}

AsyncWrap::ProviderType provider =
(type == HTTP_REQUEST ?
AsyncWrap::PROVIDER_HTTPINCOMINGMESSAGE
: AsyncWrap::PROVIDER_HTTPCLIENTREQUEST);

parser->set_provider_type(provider);
parser->Init(type);
}


template <bool should_pause>
static void Pause(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Expand Down Expand Up @@ -958,7 +960,7 @@ void InitializeHttpParser(Local<Object> target,
env->SetProtoMethod(t, "free", Parser::Free);
env->SetProtoMethod(t, "execute", Parser::Execute);
env->SetProtoMethod(t, "finish", Parser::Finish);
env->SetProtoMethod(t, "reinitialize", Parser::Reinitialize);
env->SetProtoMethod(t, "initialize", Parser::Initialize);
env->SetProtoMethod(t, "pause", Parser::Pause<true>);
env->SetProtoMethod(t, "resume", Parser::Pause<false>);
env->SetProtoMethod(t, "consume", Parser::Consume);
Expand Down
39 changes: 39 additions & 0 deletions test/async-hooks/test-httparser-reuse.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'use strict';

const common = require('../common');
const http = require('http');
const assert = require('assert');
const { createHook } = require('async_hooks');
const reused = Symbol('reused');

let reusedHTTPParser = false;
const asyncHook = createHook({
init(asyncId, type, triggerAsyncId, resource) {
if (resource[reused]) {
reusedHTTPParser = true;
}
resource[reused] = true;
}
});
asyncHook.enable();

const server = http.createServer(function(req, res) {
res.end();
});

const PORT = 3000;
const url = 'http://127.0.0.1:' + PORT;

server.listen(PORT, common.mustCall(() => {
http.get(url, common.mustCall(() => {
server.close(common.mustCall(() => {
server.listen(PORT, common.mustCall(() => {
http.get(url, common.mustCall(() => {
server.close(common.mustCall(() => {
assert.strictEqual(reusedHTTPParser, false);
}));
}));
}));
}));
}));
}));
4 changes: 2 additions & 2 deletions test/async-hooks/test-httpparser.request.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const request = Buffer.from(
);

const parser = new HTTPParser(REQUEST);
const as = hooks.activitiesOfTypes('HTTPPARSER');
const as = hooks.activitiesOfTypes('HTTPINCOMINGMESSAGE');
const httpparser = as[0];

assert.strictEqual(as.length, 1);
Expand All @@ -47,7 +47,7 @@ process.on('exit', onexit);

function onexit() {
hooks.disable();
hooks.sanityCheck('HTTPPARSER');
hooks.sanityCheck('HTTPINCOMINGMESSAGE');
checkInvocations(httpparser, { init: 1, before: 1, after: 1, destroy: 1 },
'when process exits');
}
4 changes: 2 additions & 2 deletions test/async-hooks/test-httpparser.response.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const request = Buffer.from(
);

const parser = new HTTPParser(RESPONSE);
const as = hooks.activitiesOfTypes('HTTPPARSER');
const as = hooks.activitiesOfTypes('HTTPCLIENTREQUEST');
const httpparser = as[0];

assert.strictEqual(as.length, 1);
Expand Down Expand Up @@ -58,7 +58,7 @@ process.on('exit', onexit);

function onexit() {
hooks.disable();
hooks.sanityCheck('HTTPPARSER');
hooks.sanityCheck('HTTPCLIENTREQUEST');
checkInvocations(httpparser, { init: 1, before: 2, after: 2, destroy: 1 },
'when process exits');
}
4 changes: 2 additions & 2 deletions test/parallel/test-http-parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ function expectBody(expected) {
throw new Error('hello world');
};

parser.reinitialize(HTTPParser.REQUEST, false);
parser.initialize(HTTPParser.REQUEST, request);

assert.throws(
() => { parser.execute(request, 0, request.length); },
Expand Down Expand Up @@ -555,7 +555,7 @@ function expectBody(expected) {
parser[kOnBody] = expectBody('ping');
parser.execute(req1, 0, req1.length);

parser.reinitialize(REQUEST, false);
parser.initialize(REQUEST, req2);
parser[kOnBody] = expectBody('pong');
parser[kOnHeadersComplete] = onHeadersComplete2;
parser.execute(req2, 0, req2.length);
Expand Down
2 changes: 2 additions & 0 deletions test/sequential/test-async-wrap-getasyncid.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const { getSystemErrorName } = require('util');
if (!common.isMainThread)
delete providers.INSPECTORJSBINDING;
delete providers.KEYPAIRGENREQUEST;
delete providers.HTTPCLIENTREQUEST;
delete providers.HTTPINCOMINGMESSAGE;

const objKeys = Object.keys(providers);
if (objKeys.length > 0)
Expand Down
3 changes: 2 additions & 1 deletion test/sequential/test-http-regr-gh-2928.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const common = require('../common');
const assert = require('assert');
const httpCommon = require('_http_common');
const { HTTPParser } = require('_http_common');
const { AsyncResource } = require('async_hooks');
const net = require('net');

const COUNT = httpCommon.parsers.max + 1;
Expand All @@ -24,7 +25,7 @@ function execAndClose() {
process.stdout.write('.');

const parser = parsers.pop();
parser.reinitialize(HTTPParser.RESPONSE, !!parser.reused);
parser.initialize(HTTPParser.RESPONSE, new AsyncResource('ClientRequest'));

const socket = net.connect(common.PORT);
socket.on('error', (e) => {
Expand Down

0 comments on commit ece5073

Please sign in to comment.