diff --git a/src/env-inl.h b/src/env-inl.h index 0a9b494688a2de..78f53c9f0f7519 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -483,8 +483,15 @@ Environment::scheduled_immediate_count() { return scheduled_immediate_count_; } -void Environment::SetImmediate(native_immediate_callback cb, void* data) { - native_immediate_callbacks_.push_back({ cb, data }); +void Environment::SetImmediate(native_immediate_callback cb, + void* data, + v8::Local obj) { + native_immediate_callbacks_.push_back({ + cb, + data, + std::unique_ptr>( + obj.IsEmpty() ? nullptr : new v8::Persistent(isolate_, obj)) + }); if (scheduled_immediate_count_[0] == 0) ActivateImmediateCheck(); scheduled_immediate_count_[0] = scheduled_immediate_count_[0] + 1; diff --git a/src/env.cc b/src/env.cc index 82bdb4f900dc25..d9650fadd01140 100644 --- a/src/env.cc +++ b/src/env.cc @@ -278,6 +278,8 @@ void Environment::RunAndClearNativeImmediates() { native_immediate_callbacks_.swap(list); for (const auto& cb : list) { cb.cb_(this, cb.data_); + if (cb.keep_alive_) + cb.keep_alive_->Reset(); } #ifdef DEBUG diff --git a/src/env.h b/src/env.h index f5161b9f8a9959..1e0175946ec38a 100644 --- a/src/env.h +++ b/src/env.h @@ -686,7 +686,11 @@ class Environment { bool EmitNapiWarning(); typedef void (*native_immediate_callback)(Environment* env, void* data); - inline void SetImmediate(native_immediate_callback cb, void* data); + // cb will be called as cb(env, data) on the next event loop iteration. + // obj will be kept alive between now and after the callback has run. + inline void SetImmediate(native_immediate_callback cb, + void* data, + v8::Local obj = v8::Local()); // This needs to be available for the JS-land setImmediate(). void ActivateImmediateCheck(); @@ -751,6 +755,7 @@ class Environment { struct NativeImmediateCallback { native_immediate_callback cb_; void* data_; + std::unique_ptr> keep_alive_; }; std::vector native_immediate_callbacks_; void RunAndClearNativeImmediates(); diff --git a/src/node_http2.cc b/src/node_http2.cc index b439ae588a7756..465c045c661dad 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -27,6 +27,26 @@ const Http2Session::Callbacks Http2Session::callback_struct_saved[2] = { Callbacks(false), Callbacks(true)}; +Http2Scope::Http2Scope(Http2Stream* stream) : Http2Scope(stream->session()) {} + +Http2Scope::Http2Scope(Http2Session* session) { + if (session->flags_ & (SESSION_STATE_HAS_SCOPE | + SESSION_STATE_WRITE_SCHEDULED)) { + // There is another scope further below on the stack, or it is already + // known that a write is scheduled. In either case, there is nothing to do. + return; + } + session->flags_ |= SESSION_STATE_HAS_SCOPE; + session_ = session; +} + +Http2Scope::~Http2Scope() { + if (session_ == nullptr) + return; + + session_->flags_ &= ~SESSION_STATE_HAS_SCOPE; + session_->MaybeScheduleWrite(); +} Http2Options::Http2Options(Environment* env) { nghttp2_option_new(&options_); @@ -346,8 +366,6 @@ Http2Session::Http2Session(Environment* env, // be catching before it gets this far. Either way, crash if this // fails. CHECK_EQ(fn(&session_, callbacks, this, *opts), 0); - - Start(); } @@ -356,40 +374,6 @@ Http2Session::~Http2Session() { Close(); } -// For every node::Http2Session instance, there is a uv_prepare_t handle -// whose callback is triggered on every tick of the event loop. When -// run, nghttp2 is prompted to send any queued data it may have stored. -// TODO(jasnell): Currently, this creates one uv_prepare_t per Http2Session, -// we should investigate to see if it's faster to create a -// single uv_prepare_t for all Http2Sessions, then iterate -// over each. -void Http2Session::Start() { - prep_ = new uv_prepare_t(); - uv_prepare_init(env()->event_loop(), prep_); - prep_->data = static_cast(this); - uv_prepare_start(prep_, [](uv_prepare_t* t) { - Http2Session* session = static_cast(t->data); - HandleScope scope(session->env()->isolate()); - Context::Scope context_scope(session->env()->context()); - - // Sending data may call arbitrary JS code, so keep track of - // async context. - InternalCallbackScope callback_scope(session); - session->SendPendingData(); - }); -} - -// Stop the uv_prep_t from further activity, destroy the handle -void Http2Session::Stop() { - DEBUG_HTTP2SESSION(this, "stopping uv_prep_t handle"); - CHECK_EQ(uv_prepare_stop(prep_), 0); - auto prep_close = [](uv_handle_t* handle) { - delete reinterpret_cast(handle); - }; - uv_close(reinterpret_cast(prep_), prep_close); - prep_ = nullptr; -} - void Http2Session::Close() { DEBUG_HTTP2SESSION(this, "closing session"); @@ -406,10 +390,12 @@ void Http2Session::Close() { while (!outstanding_pings_.empty()) { Http2Session::Http2Ping* ping = PopPing(); - ping->Done(false); + // Since this method may be called from GC, calling into JS directly + // is not allowed. + env()->SetImmediate([](Environment* env, void* data) { + static_cast(data)->Done(false); + }, static_cast(ping)); } - - Stop(); } @@ -480,6 +466,7 @@ inline void Http2Session::SubmitShutdownNotice() { inline void Http2Session::Settings(const nghttp2_settings_entry iv[], size_t niv) { DEBUG_HTTP2SESSION2(this, "submitting %d settings", niv); + Http2Scope h2scope(this); // This will fail either if the system is out of memory, or if the settings // values are not within the appropriate range. We should be catching the // latter before it gets this far so crash in either case. @@ -732,7 +719,8 @@ Http2Stream::SubmitTrailers::SubmitTrailers( inline void Http2Stream::SubmitTrailers::Submit(nghttp2_nv* trailers, - size_t length) const { + size_t length) const { + Http2Scope h2scope(session_); if (length == 0) return; DEBUG_HTTP2SESSION2(session_, "sending trailers for stream %d, count: %d", @@ -887,14 +875,37 @@ inline void Http2Session::HandleSettingsFrame(const nghttp2_frame* frame) { MakeCallback(env()->onsettings_string(), arraysize(argv), argv); } +void Http2Session::MaybeScheduleWrite() { + CHECK_EQ(flags_ & SESSION_STATE_WRITE_SCHEDULED, 0); + if (session_ != nullptr && nghttp2_session_want_write(session_)) { + flags_ |= SESSION_STATE_WRITE_SCHEDULED; + env()->SetImmediate([](Environment* env, void* data) { + Http2Session* session = static_cast(data); + if (session->session_ == nullptr || + !(session->flags_ & SESSION_STATE_WRITE_SCHEDULED)) { + // This can happen e.g. when a stream was reset before this turn + // of the event loop, in which case SendPendingData() is called early, + // or the session was destroyed in the meantime. + return; + } + + // Sending data may call arbitrary JS code, so keep track of + // async context. + InternalCallbackScope callback_scope(session); + session->SendPendingData(); + }, static_cast(this), object()); + } +} + -inline void Http2Session::SendPendingData() { +void Http2Session::SendPendingData() { DEBUG_HTTP2SESSION(this, "sending pending data"); // Do not attempt to send data on the socket if the destroying flag has // been set. That means everything is shutting down and the socket // will not be usable. if (IsDestroying()) return; + flags_ &= ~SESSION_STATE_WRITE_SCHEDULED; WriteWrap* req = nullptr; char* dest = nullptr; @@ -959,6 +970,7 @@ inline Http2Stream* Http2Session::SubmitRequest( int32_t* ret, int options) { DEBUG_HTTP2SESSION(this, "submitting request"); + Http2Scope h2scope(this); Http2Stream* stream = nullptr; Http2Stream::Provider::Stream prov(options); *ret = nghttp2_submit_request(session_, prispec, nva, len, *prov, nullptr); @@ -1018,6 +1030,7 @@ void Http2Session::OnStreamReadImpl(ssize_t nread, uv_handle_type pending, void* ctx) { Http2Session* session = static_cast(ctx); + Http2Scope h2scope(session); if (nread < 0) { uv_buf_t tmp_buf; tmp_buf.base = nullptr; @@ -1183,6 +1196,7 @@ inline void Http2Stream::Close(int32_t code) { inline void Http2Stream::Shutdown() { + Http2Scope h2scope(this); flags_ |= NGHTTP2_STREAM_FLAG_SHUT; CHECK_NE(nghttp2_session_resume_data(session_->session(), id_), NGHTTP2_ERR_NOMEM); @@ -1197,6 +1211,7 @@ int Http2Stream::DoShutdown(ShutdownWrap* req_wrap) { } inline void Http2Stream::Destroy() { + Http2Scope h2scope(this); DEBUG_HTTP2STREAM(this, "destroying stream"); // Do nothing if this stream instance is already destroyed if (IsDestroyed()) @@ -1248,6 +1263,7 @@ void Http2Stream::OnDataChunk( inline void Http2Stream::FlushDataChunks() { + Http2Scope h2scope(this); if (!data_chunks_.empty()) { uv_buf_t buf = data_chunks_.front(); data_chunks_.pop(); @@ -1265,6 +1281,7 @@ inline void Http2Stream::FlushDataChunks() { inline int Http2Stream::SubmitResponse(nghttp2_nv* nva, size_t len, int options) { + Http2Scope h2scope(this); DEBUG_HTTP2STREAM(this, "submitting response"); if (options & STREAM_OPTION_GET_TRAILERS) flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS; @@ -1285,6 +1302,7 @@ inline int Http2Stream::SubmitFile(int fd, int64_t offset, int64_t length, int options) { + Http2Scope h2scope(this); DEBUG_HTTP2STREAM(this, "submitting file"); if (options & STREAM_OPTION_GET_TRAILERS) flags_ |= NGHTTP2_STREAM_FLAG_TRAILERS; @@ -1301,6 +1319,7 @@ inline int Http2Stream::SubmitFile(int fd, // Submit informational headers for a stream. inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) { + Http2Scope h2scope(this); DEBUG_HTTP2STREAM2(this, "sending %d informational headers", len); int ret = nghttp2_submit_headers(session_->session(), NGHTTP2_FLAG_NONE, @@ -1313,6 +1332,7 @@ inline int Http2Stream::SubmitInfo(nghttp2_nv* nva, size_t len) { inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec, bool silent) { + Http2Scope h2scope(this); DEBUG_HTTP2STREAM(this, "sending priority spec"); int ret = silent ? nghttp2_session_change_stream_priority(session_->session(), @@ -1326,6 +1346,7 @@ inline int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec, inline int Http2Stream::SubmitRstStream(const uint32_t code) { + Http2Scope h2scope(this); DEBUG_HTTP2STREAM2(this, "sending rst-stream with code %d", code); session_->SendPendingData(); CHECK_EQ(nghttp2_submit_rst_stream(session_->session(), @@ -1341,6 +1362,7 @@ inline Http2Stream* Http2Stream::SubmitPushPromise(nghttp2_nv* nva, size_t len, int32_t* ret, int options) { + Http2Scope h2scope(this); DEBUG_HTTP2STREAM(this, "sending push promise"); *ret = nghttp2_submit_push_promise(session_->session(), NGHTTP2_FLAG_NONE, id_, nva, len, nullptr); @@ -1380,6 +1402,7 @@ inline int Http2Stream::Write(nghttp2_stream_write_t* req, const uv_buf_t bufs[], unsigned int nbufs, nghttp2_stream_write_cb cb) { + Http2Scope h2scope(this); if (!IsWritable()) { if (cb != nullptr) cb(req, UV_EOF); @@ -1763,6 +1786,7 @@ void Http2Session::Goaway(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); Local context = env->context(); ASSIGN_OR_RETURN_UNWRAP(&session, args.Holder()); + Http2Scope h2scope(session); uint32_t errorCode = args[0]->Uint32Value(context).ToChecked(); int32_t lastStreamID = args[1]->Int32Value(context).ToChecked(); @@ -2038,6 +2062,7 @@ void Http2Session::Http2Ping::Send(uint8_t* payload) { memcpy(&data, &startTime_, arraysize(data)); payload = data; } + Http2Scope h2scope(session_); CHECK_EQ(nghttp2_submit_ping(**session_, NGHTTP2_FLAG_NONE, payload), 0); } diff --git a/src/node_http2.h b/src/node_http2.h index f375a249b803b8..5375d1a68cf6c5 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -417,7 +417,9 @@ const char* nghttp2_errname(int rv) { enum session_state_flags { SESSION_STATE_NONE = 0x0, - SESSION_STATE_DESTROYING = 0x1 + SESSION_STATE_DESTROYING = 0x1, + SESSION_STATE_HAS_SCOPE = 0x2, + SESSION_STATE_WRITE_SCHEDULED = 0x4 }; // This allows for 4 default-sized frames with their frame headers @@ -429,6 +431,19 @@ typedef uint32_t(*get_setting)(nghttp2_session* session, class Http2Session; class Http2Stream; +// This scope should be present when any call into nghttp2 that may schedule +// data to be written to the underlying transport is made, and schedules +// such a write automatically once the scope is exited. +class Http2Scope { + public: + explicit Http2Scope(Http2Stream* stream); + explicit Http2Scope(Http2Session* session); + ~Http2Scope(); + + private: + Http2Session* session_ = nullptr; +}; + // The Http2Options class is used to parse the options object passed in to // a Http2Session object and convert those into an appropriate nghttp2_option // struct. This is the primary mechanism by which the Http2Session object is @@ -816,6 +831,9 @@ class Http2Session : public AsyncWrap { inline void MarkDestroying() { flags_ |= SESSION_STATE_DESTROYING; } inline bool IsDestroying() { return flags_ & SESSION_STATE_DESTROYING; } + // Schedule a write if nghttp2 indicates it wants to write to the socket. + void MaybeScheduleWrite(); + // Returns pointer to the stream, or nullptr if stream does not exist inline Http2Stream* FindStream(int32_t id); @@ -1005,6 +1023,8 @@ class Http2Session : public AsyncWrap { size_t max_outstanding_pings_ = DEFAULT_MAX_PINGS; std::queue outstanding_pings_; + + friend class Http2Scope; }; class Http2Session::Http2Ping : public AsyncWrap { diff --git a/test/parallel/test-http2-session-gc-while-write-scheduled.js b/test/parallel/test-http2-session-gc-while-write-scheduled.js new file mode 100644 index 00000000000000..bb23760cebf967 --- /dev/null +++ b/test/parallel/test-http2-session-gc-while-write-scheduled.js @@ -0,0 +1,32 @@ +// Flags: --expose-gc + +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const http2 = require('http2'); +const makeDuplexPair = require('../common/duplexpair'); + +// This tests that running garbage collection while an Http2Session has +// a write *scheduled*, it will survive that garbage collection. + +{ + // This creates a session and schedules a write (for the settings frame). + let client = http2.connect('http://localhost:80', { + createConnection: common.mustCall(() => makeDuplexPair().clientSide) + }); + + // First, wait for any nextTicks() and their responses + // from the `connect()` call to run. + tick(10, () => { + // This schedules a write. + client.settings(http2.getDefaultSettings()); + client = null; + global.gc(); + }); +} + +function tick(n, cb) { + if (n--) setImmediate(tick, n, cb); + else cb(); +}