Skip to content

Commit

Permalink
http2: only schedule write when necessary
Browse files Browse the repository at this point in the history
Introduce an `Http2Scope` class that, when it goes out of scope,
checks whether a write to the network is desired by nghttp2.
If that is the case, schedule a write using `SetImmediate()`
rather than a custom per-session libuv handle.
  • Loading branch information
addaleax committed Nov 21, 2017
1 parent 1e3b395 commit 0e14510
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 41 deletions.
101 changes: 61 additions & 40 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ const Http2Session::Callbacks Http2Session::callback_struct_saved[2] = {
Callbacks(false),
Callbacks(true)};

Http2Scope::Http2Scope(Http2Stream* stream) : Http2Scope(stream->session()) {}

Http2Scope::Http2Scope(Http2Session* session) {
if (session->flags_ & (SESSION_STATE_HAS_SCOPE |
SESSION_STATE_WRITE_SCHEDULED)) {
// There is another scope further below on the stack, or it is already
// known that a write is scheduled. In either case, there is nothing to do.
return;
}
session->flags_ |= SESSION_STATE_HAS_SCOPE;
session_ = session;
}

Http2Scope::~Http2Scope() {
if (session_ == nullptr)
return;

session_->flags_ &= ~SESSION_STATE_HAS_SCOPE;
session_->MaybeScheduleWrite();
}

Http2Options::Http2Options(Environment* env) {
nghttp2_option_new(&options_);
Expand Down Expand Up @@ -346,8 +366,6 @@ Http2Session::Http2Session(Environment* env,
// be catching before it gets this far. Either way, crash if this
// fails.
CHECK_EQ(fn(&session_, callbacks, this, *opts), 0);

Start();
}


Expand All @@ -356,40 +374,6 @@ Http2Session::~Http2Session() {
Close();
}

// For every node::Http2Session instance, there is a uv_prepare_t handle
// whose callback is triggered on every tick of the event loop. When
// run, nghttp2 is prompted to send any queued data it may have stored.
// TODO(jasnell): Currently, this creates one uv_prepare_t per Http2Session,
// we should investigate to see if it's faster to create a
// single uv_prepare_t for all Http2Sessions, then iterate
// over each.
void Http2Session::Start() {
prep_ = new uv_prepare_t();
uv_prepare_init(env()->event_loop(), prep_);
prep_->data = static_cast<void*>(this);
uv_prepare_start(prep_, [](uv_prepare_t* t) {
Http2Session* session = static_cast<Http2Session*>(t->data);
HandleScope scope(session->env()->isolate());
Context::Scope context_scope(session->env()->context());

// Sending data may call arbitrary JS code, so keep track of
// async context.
InternalCallbackScope callback_scope(session);
session->SendPendingData();
});
}

// Stop the uv_prep_t from further activity, destroy the handle
void Http2Session::Stop() {
DEBUG_HTTP2SESSION(this, "stopping uv_prep_t handle");
CHECK_EQ(uv_prepare_stop(prep_), 0);
auto prep_close = [](uv_handle_t* handle) {
delete reinterpret_cast<uv_prepare_t*>(handle);
};
uv_close(reinterpret_cast<uv_handle_t*>(prep_), prep_close);
prep_ = nullptr;
}


void Http2Session::Close() {
DEBUG_HTTP2SESSION(this, "closing session");
Expand All @@ -412,8 +396,6 @@ void Http2Session::Close() {
static_cast<Http2Session::Http2Ping*>(data)->Done(false);
}, static_cast<void*>(ping));
}

Stop();
}


Expand Down Expand Up @@ -484,6 +466,7 @@ inline void Http2Session::SubmitShutdownNotice() {
inline void Http2Session::Settings(const nghttp2_settings_entry iv[],
size_t niv) {
DEBUG_HTTP2SESSION2(this, "submitting %d settings", niv);
Http2Scope h2scope(this);
// This will fail either if the system is out of memory, or if the settings
// values are not within the appropriate range. We should be catching the
// latter before it gets this far so crash in either case.
Expand Down Expand Up @@ -736,7 +719,8 @@ Http2Stream::SubmitTrailers::SubmitTrailers(


inline void Http2Stream::SubmitTrailers::Submit(nghttp2_nv* trailers,
size_t length) const {
size_t length) const {
Http2Scope h2scope(session_);
if (length == 0)
return;
DEBUG_HTTP2SESSION2(session_, "sending trailers for stream %d, count: %d",
Expand Down Expand Up @@ -891,14 +875,37 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) {
MakeCallback(env()->onsettings_string(), arraysize(argv), argv);
}

void Http2Session::MaybeScheduleWrite() {
CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0);
if (session_ != nullptr && nghttp2_session_want_write(session_)) {
flags_ |= SESSION_STATE_WRITE_SCHEDULED;
env()->SetImmediate([](Environment* env, void* data) {
Http2Session* session = static_cast<Http2Session*>(data);
if (session->session_ == nullptr ||
!(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) {
// This can happen e.g. when a stream was reset before this turn
// of the event loop, in which case SendPendingData() is called early,
// or the session was destroyed in the meantime.
return;
}

// Sending data may call arbitrary JS code, so keep track of
// async context.
InternalCallbackScope callback_scope(session);
session->SendPendingData();
}, static_cast<void*>(this), object());
}
}


inline void Http2Session::SendPendingData() {
void Http2Session::SendPendingData() {
DEBUG_HTTP2SESSION(this, "sending pending data");
// Do not attempt to send data on the socket if the destroying flag has
// been set. That means everything is shutting down and the socket
// will not be usable.
if (IsDestroying())
return;
flags_ &= ~SESSION_STATE_WRITE_SCHEDULED;

WriteWrap* req = nullptr;
char* dest = nullptr;
Expand Down Expand Up @@ -963,6 +970,7 @@ inline Http2Stream* Http2Session::SubmitRequest(
int32_t* ret,
int options) {
DEBUG_HTTP2SESSION(this, "submitting request");
Http2Scope h2scope(this);
Http2Stream* stream = nullptr;
Http2Stream::Provider::Stream prov(options);
*ret = nghttp2_submit_request(session_, prispec, nva, len, *prov, nullptr);
Expand Down Expand Up @@ -1022,6 +1030,7 @@ void Http2Session::OnStreamReadImpl(ssize_t nread,
uv_handle_type pending,
void* ctx) {
Http2Session* session = static_cast<Http2Session*>(ctx);
Http2Scope h2scope(session);
if (nread < 0) {
uv_buf_t tmp_buf;
tmp_buf.base = nullptr;
Expand Down Expand Up @@ -1187,6 +1196,7 @@ inline void Http2Stream::Close(int32_t code) {


inline void Http2Stream::Shutdown() {
Http2Scope h2scope(this);
flags_ |= NGHTTP2_STREAM_FLAG_SHUT;
CHECK_NE(nghttp2_session_resume_data(session_->session(), id_),
NGHTTP2_ERR_NOMEM);
Expand All @@ -1201,6 +1211,7 @@ int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) {
}

inline void Http2Stream::Destroy() {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "destroying stream");
// Do nothing if this stream instance is already destroyed
if (IsDestroyed())
Expand Down Expand Up @@ -1252,6 +1263,7 @@ void Http2Stream::OnDataChunk(


inline void Http2Stream::FlushDataChunks() {
Http2Scope h2scope(this);
if (!data_chunks_.empty()) {
uv_buf_t buf = data_chunks_.front();
data_chunks_.pop();
Expand All @@ -1269,6 +1281,7 @@ inline void Http2Stream::FlushDataChunks() {
inline int Http2Stream::SubmitResponse(nghttp2_nv* nva,
size_t len,
int options) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "submitting response");
if (options & STREAM_OPTION_GET_TRAILERS)
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
Expand All @@ -1289,6 +1302,7 @@ inline int Http2Stream::SubmitFile(int fd,
int64_t offset,
int64_t length,
int options) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "submitting file");
if (options & STREAM_OPTION_GET_TRAILERS)
flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS;
Expand All @@ -1305,6 +1319,7 @@ inline int Http2Stream::SubmitFile(int fd,

// Submit informational headers for a stream.
inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM2(this, "sending %d informational headers", len);
int ret = nghttp2_submit_headers(session_->session(),
NGHTTP2_FLAG_NONE,
Expand All @@ -1317,6 +1332,7 @@ inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) {

inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,
bool silent) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "sending priority spec");
int ret = silent ?
nghttp2_session_change_stream_priority(session_->session(),
Expand All @@ -1330,6 +1346,7 @@ inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec,


inline int Http2Stream::SubmitRstStream(const uint32_t code) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM2(this, "sending rst-stream with code %d", code);
session_->SendPendingData();
CHECK_EQ(nghttp2_submit_rst_stream(session_->session(),
Expand All @@ -1345,6 +1362,7 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva,
size_t len,
int32_t* ret,
int options) {
Http2Scope h2scope(this);
DEBUG_HTTP2STREAM(this, "sending push promise");
*ret = nghttp2_submit_push_promise(session_->session(), NGHTTP2_FLAG_NONE,
id_, nva, len, nullptr);
Expand Down Expand Up @@ -1384,6 +1402,7 @@ inline int Http2Stream::Write(nghttp2_stream_write_t* req,
const uv_buf_t bufs[],
unsigned int nbufs,
nghttp2_stream_write_cb cb) {
Http2Scope h2scope(this);
if (!IsWritable()) {
if (cb != nullptr)
cb(req, UV_EOF);
Expand Down Expand Up @@ -1767,6 +1786,7 @@ void Http2Session::Goaway(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Context> context = env->context();
ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder());
Http2Scope h2scope(session);

uint32_t errorCode = args[0]->Uint32Value(context).ToChecked();
int32_t lastStreamID = args[1]->Int32Value(context).ToChecked();
Expand Down Expand Up @@ -2042,6 +2062,7 @@ void Http2Session::Http2Ping::Send(uint8_t* payload) {
memcpy(&data, &startTime_, arraysize(data));
payload = data;
}
Http2Scope h2scope(session_);
CHECK_EQ(nghttp2_submit_ping(**session_, NGHTTP2_FLAG_NONE, payload), 0);
}

Expand Down
22 changes: 21 additions & 1 deletion src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,9 @@ const char* nghttp2_errname(int rv) {

enum session_state_flags {
SESSION_STATE_NONE = 0x0,
SESSION_STATE_DESTROYING = 0x1
SESSION_STATE_DESTROYING = 0x1,
SESSION_STATE_HAS_SCOPE = 0x2,
SESSION_STATE_WRITE_SCHEDULED = 0x4
};

// This allows for 4 default-sized frames with their frame headers
Expand All @@ -429,6 +431,19 @@ typedef uint32_t(*get_setting)(nghttp2_session* session,
class Http2Session;
class Http2Stream;

// This scope should be present when any call into nghttp2 that may schedule
// data to be written to the underlying transport is made, and schedules
// such a write automatically once the scope is exited.
class Http2Scope {
public:
explicit Http2Scope(Http2Stream* stream);
explicit Http2Scope(Http2Session* session);
~Http2Scope();

private:
Http2Session* session_ = nullptr;
};

// The Http2Options class is used to parse the options object passed in to
// a Http2Session object and convert those into an appropriate nghttp2_option
// struct. This is the primary mechanism by which the Http2Session object is
Expand Down Expand Up @@ -816,6 +831,9 @@ class Http2Session : public AsyncWrap {
inline void MarkDestroying() { flags_ |= SESSION_STATE_DESTROYING; }
inline bool IsDestroying() { return flags_ & SESSION_STATE_DESTROYING; }

// Schedule a write if nghttp2 indicates it wants to write to the socket.
void MaybeScheduleWrite();

// Returns pointer to the stream, or nullptr if stream does not exist
inline Http2Stream* FindStream(int32_t id);

Expand Down Expand Up @@ -1005,6 +1023,8 @@ class Http2Session : public AsyncWrap {

size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS;
std::queue<Http2Ping*> outstanding_pings_;

friend class Http2Scope;
};

class Http2Session::Http2Ping : public AsyncWrap {
Expand Down
32 changes: 32 additions & 0 deletions test/parallel/test-http2-session-gc-while-write-scheduled.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Flags: --expose-gc

'use strict';
const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');
const http2 = require('http2');
const makeDuplexPair = require('../common/duplexpair');

// This tests that running garbage collection while an Http2Session has
// a write *scheduled*, it will survive that garbage collection.

{
// This creates a session and schedules a write (for the settings frame).
let client = http2.connect('http://localhost:80', {
createConnection: common.mustCall(() => makeDuplexPair().clientSide)
});

// First, wait for any nextTicks() and their responses
// from the `connect()` call to run.
tick(10, () => {
// This schedules a write.
client.settings(http2.getDefaultSettings());
client = null;
global.gc();
});
}

function tick(n, cb) {
if (n--) setImmediate(tick, n, cb);
else cb();
}

0 comments on commit 0e14510

Please sign in to comment.