Skip to content

Commit

Permalink
lib: add diagnostic channels to http2
Browse files Browse the repository at this point in the history
  • Loading branch information
santigimeno committed Jan 24, 2025
1 parent 0246f46 commit c19c5b2
Showing 1 changed file with 92 additions and 1 deletion.
93 changes: 92 additions & 1 deletion lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ const { UV_EOF } = internalBinding('uv');

const { StreamPipe } = internalBinding('stream_pipe');
const { _connectionListener: httpConnectionListener } = http;

const dc = require('diagnostics_channel');
const onClientStreamCreatedChannel = dc.channel('http2.client.stream.created');
const onClientStreamStartChannel = dc.channel('http2.client.stream.start');
const onClientStreamErrorChannel = dc.channel('http2.client.stream.error');
const onClientStreamFinishChannel = dc.channel('http2.client.stream.finish');
const onClientStreamCloseChannel = dc.channel('http2.client.stream.close');
const onServerStreamStartChannel = dc.channel('http2.server.stream.start');
const onServerStreamErrorChannel = dc.channel('http2.server.stream.error');
const onServerStreamFinishChannel = dc.channel('http2.server.stream.finish');
const onServerStreamCloseChannel = dc.channel('http2.server.stream.close');

let debug = require('internal/util/debuglog').debuglog('http2', (fn) => {
debug = fn;
});
Expand Down Expand Up @@ -375,9 +387,22 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
stream.end();
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;
}

if (onServerStreamStartChannel.hasSubscribers) {
onServerStreamStartChannel.publish({
stream,
headers,
});
}
} else {
// eslint-disable-next-line no-use-before-define
stream = new ClientHttp2Stream(session, handle, id, {});
if (onClientStreamCreatedChannel.hasSubscribers) {
onClientStreamCreatedChannel.publish({
stream,
});
}

if (endOfStream) {
stream.push(null);
}
Expand Down Expand Up @@ -416,6 +441,16 @@ function onSessionHeaders(handle, id, cat, flags, headers, sensitiveHeaders) {
reqAsync.runInAsyncScope(process.nextTick, null, emit, stream, event, obj, flags, headers);
else
process.nextTick(emit, stream, event, obj, flags, headers);

if (event === 'response') {
if (onClientStreamFinishChannel.hasSubscribers) {
onClientStreamFinishChannel.publish({
stream,
headers,
flags,
});
}
}
}
if (endOfStream) {
stream.push(null);
Expand Down Expand Up @@ -766,7 +801,14 @@ function requestOnConnect(headers, options) {
}
return;
}

this[kInit](ret.id(), ret);
if (onClientStreamStartChannel.hasSubscribers) {
onClientStreamStartChannel.publish({
stream: this,
headers,
});
}
}

// Validates that priority options are correct, specifically:
Expand Down Expand Up @@ -1851,6 +1893,13 @@ class ClientHttp2Session extends Http2Session {
} else {
onConnect();
}

if (onClientStreamCreatedChannel.hasSubscribers) {
onClientStreamCreatedChannel.publish({
stream,
});
}

return stream;
}
}
Expand Down Expand Up @@ -1925,6 +1974,7 @@ const kSubmitRstStream = 1;
const kForceRstStream = 2;

function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
const type = stream.session[kType];
const state = stream[kState];
state.flags |= STREAM_FLAGS_CLOSED;
state.rstCode = code;
Expand Down Expand Up @@ -1955,6 +2005,20 @@ function closeStream(stream, code, rstStreamStatus = kSubmitRstStream) {
else
stream.once('finish', finishFn);
}

if (type === NGHTTP2_SESSION_CLIENT) {
if (onClientStreamCloseChannel.hasSubscribers) {
onClientStreamCloseChannel.publish({
stream,
code,
});
}
} else if (onServerStreamCloseChannel.hasSubscribers) {
onServerStreamCloseChannel.publish({
stream,
code,
});
}
}

function finishCloseStream(code) {
Expand Down Expand Up @@ -2381,6 +2445,21 @@ class Http2Stream extends Duplex {
setImmediate(() => {
session[kMaybeDestroy]();
});
if (err) {
if (session[kType] === NGHTTP2_SESSION_CLIENT) {
if (onClientStreamErrorChannel.hasSubscribers) {
onClientStreamErrorChannel.publish({
stream: this,
error: err,
});
}
} else if (onServerStreamErrorChannel.hasSubscribers) {
onServerStreamErrorChannel.publish({
stream: this,
error: err,
});
}
}
callback(err);
}
// The Http2Stream can be destroyed if it has closed and if the readable
Expand Down Expand Up @@ -2766,6 +2845,13 @@ class ServerHttp2Stream extends Http2Stream {
stream[kState].flags |= STREAM_FLAGS_HEAD_REQUEST;

process.nextTick(callback, null, stream, headers, 0);

if (onServerStreamStartChannel.hasSubscribers) {
onServerStreamStartChannel.publish({
stream,
headers,
});
}
}

// Initiate a response on this Http2Stream
Expand Down Expand Up @@ -2813,8 +2899,13 @@ class ServerHttp2Stream extends Http2Stream {
}

const ret = this[kHandle].respond(headersList, streamOptions);
if (ret < 0)
if (ret < 0) {
this.destroy(new NghttpError(ret));
} else if (onServerStreamFinishChannel.hasSubscribers) {
onServerStreamFinishChannel.publish({
stream: this,
});
}
}

// Initiate a response using an open FD. Note that there are fewer
Expand Down

0 comments on commit c19c5b2

Please sign in to comment.