Skip to content

Commit

Permalink
worker: improve MessagePort performance
Browse files Browse the repository at this point in the history
Use a JS function as the single entry point for emitting `.onmessage()`
calls, avoiding the overhead of manually constructing each message
event object in C++.

                                                                             confidence improvement accuracy (*)   (**)  (***)
    worker/echo.js n=100000 sendsPerBroadcast=1 payload='object' workers=1         ***     16.34 %       ±1.16% ±1.54% ±1.99%
    worker/echo.js n=100000 sendsPerBroadcast=1 payload='string' workers=1         ***     24.41 %       ±1.50% ±1.99% ±2.58%
    worker/echo.js n=100000 sendsPerBroadcast=10 payload='object' workers=1        ***     26.66 %       ±1.54% ±2.05% ±2.65%
    worker/echo.js n=100000 sendsPerBroadcast=10 payload='string' workers=1        ***     32.72 %       ±1.60% ±2.11% ±2.73%
    worker/messageport.js n=1000000 payload='object'                               ***     40.28 %       ±1.48% ±1.95% ±2.52%
    worker/messageport.js n=1000000 payload='string'                               ***     76.95 %       ±2.19% ±2.90% ±3.75%

Also fix handling exceptions returned from `MessagePort::New`.

PR-URL: #31605
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
addaleax authored and targos committed Apr 18, 2020
1 parent 3bf21b0 commit 28cb7e7
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 24 deletions.
12 changes: 12 additions & 0 deletions lib/internal/per_context/messageport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict';
class MessageEvent {
constructor(data, target) {
this.data = data;
this.target = target;
}
}

exports.emitMessage = function(data) {
if (typeof this.onmessage === 'function')
this.onmessage(new MessageEvent(data, this));
};
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
'lib/internal/bootstrap/switches/is_not_main_thread.js',
'lib/internal/per_context/primordials.js',
'lib/internal/per_context/domexception.js',
'lib/internal/per_context/messageport.js',
'lib/async_hooks.js',
'lib/assert.js',
'lib/buffer.js',
Expand Down
1 change: 1 addition & 0 deletions src/api/environment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ bool InitializeContextForSnapshot(Local<Context> context) {

static const char* context_files[] = {"internal/per_context/primordials",
"internal/per_context/domexception",
"internal/per_context/messageport",
nullptr};

for (const char** module = context_files; *module != nullptr; module++) {
Expand Down
1 change: 0 additions & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ constexpr size_t kFsStatsBufferLength =
V(http2stream_constructor_template, v8::ObjectTemplate) \
V(http2ping_constructor_template, v8::ObjectTemplate) \
V(libuv_stream_wrap_ctor_template, v8::FunctionTemplate) \
V(message_event_object_template, v8::ObjectTemplate) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(pipe_constructor_template, v8::FunctionTemplate) \
V(promise_wrap_template, v8::ObjectTemplate) \
Expand Down
63 changes: 41 additions & 22 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::ObjectTemplate;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Symbol;
Expand Down Expand Up @@ -188,6 +187,20 @@ uint32_t Message::AddWASMModule(WasmModuleObject::TransferrableModule&& mod) {

namespace {

MaybeLocal<Function> GetEmitMessageFunction(Local<Context> context) {
Isolate* isolate = context->GetIsolate();
Local<Object> per_context_bindings;
Local<Value> emit_message_val;
if (!GetPerContextExports(context).ToLocal(&per_context_bindings) ||
!per_context_bindings->Get(context,
FIXED_ONE_BYTE_STRING(isolate, "emitMessage"))
.ToLocal(&emit_message_val)) {
return MaybeLocal<Function>();
}
CHECK(emit_message_val->IsFunction());
return emit_message_val.As<Function>();
}

MaybeLocal<Function> GetDOMException(Local<Context> context) {
Isolate* isolate = context->GetIsolate();
Local<Object> per_context_bindings;
Expand Down Expand Up @@ -498,20 +511,31 @@ MessagePort::MessagePort(Environment* env,
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
channel->OnMessage();
};

CHECK_EQ(uv_async_init(env->event_loop(),
&async_,
onmessage), 0);
async_.data = static_cast<void*>(this);
async_.data = nullptr; // Reset later to indicate success of the constructor.
auto cleanup = OnScopeLeave([&]() {
if (async_.data == nullptr) Close();
});

Local<Value> fn;
if (!wrap->Get(context, env->oninit_symbol()).ToLocal(&fn))
return;

if (fn->IsFunction()) {
Local<Function> init = fn.As<Function>();
USE(init->Call(context, wrap, 0, nullptr));
if (init->Call(context, wrap, 0, nullptr).IsEmpty())
return;
}

Local<Function> emit_message_fn;
if (!GetEmitMessageFunction(context).ToLocal(&emit_message_fn))
return;
emit_message_fn_.Reset(env->isolate(), emit_message_fn);

async_.data = static_cast<void*>(this);
Debug(this, "Created message port");
}

Expand Down Expand Up @@ -559,6 +583,11 @@ MessagePort* MessagePort::New(
return nullptr;
MessagePort* port = new MessagePort(env, context, instance);
CHECK_NOT_NULL(port);
if (port->IsHandleClosing()) {
// Construction failed with an exception.
return nullptr;
}

if (data) {
port->Detach();
port->data_ = std::move(data);
Expand Down Expand Up @@ -651,16 +680,8 @@ void MessagePort::OnMessage() {
continue;
}

Local<Object> event;
Local<Value> cb_args[1];
if (!env()->message_event_object_template()->NewInstance(context)
.ToLocal(&event) ||
event->Set(context, env()->data_string(), payload).IsNothing() ||
event->Set(context, env()->target_string(), object()).IsNothing() ||
(cb_args[0] = event, false) ||
MakeCallback(env()->onmessage_string(),
arraysize(cb_args),
cb_args).IsEmpty()) {
Local<Function> emit_message = PersistentToLocal::Strong(emit_message_fn_);
if (MakeCallback(emit_message, 1, &payload).IsEmpty()) {
// Re-schedule OnMessage() execution in case of failure.
if (data_)
TriggerAsync();
Expand Down Expand Up @@ -929,6 +950,7 @@ void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {

void MessagePort::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("data", data_);
tracker->TrackField("emit_message_fn", emit_message_fn_);
}

Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
Expand All @@ -938,8 +960,6 @@ Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
if (!templ.IsEmpty())
return templ;

Isolate* isolate = env->isolate();

{
Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
m->SetClassName(env->message_port_constructor_string());
Expand All @@ -950,13 +970,6 @@ Local<FunctionTemplate> GetMessagePortConstructorTemplate(Environment* env) {
env->SetProtoMethod(m, "start", MessagePort::Start);

env->set_message_port_constructor_template(m);

Local<FunctionTemplate> event_ctor = FunctionTemplate::New(isolate);
event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent"));
Local<ObjectTemplate> e = event_ctor->InstanceTemplate();
e->Set(env->data_string(), Null(isolate));
e->Set(env->target_string(), Null(isolate));
env->set_message_event_object_template(e);
}

return GetMessagePortConstructorTemplate(env);
Expand All @@ -975,7 +988,13 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
Context::Scope context_scope(context);

MessagePort* port1 = MessagePort::New(env, context);
if (port1 == nullptr) return;
MessagePort* port2 = MessagePort::New(env, context);
if (port2 == nullptr) {
port1->Close();
return;
}

MessagePort::Entangle(port1, port2);

args.This()->Set(context, env->port1_string(), port1->object())
Expand Down
7 changes: 6 additions & 1 deletion src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,16 @@ class MessagePortData : public MemoryRetainer {
// the uv_async_t handle that is used to notify the current event loop of
// new incoming messages.
class MessagePort : public HandleWrap {
public:
private:
// Create a new MessagePort. The `context` argument specifies the Context
// instance that is used for creating the values emitted from this port.
// This is called by MessagePort::New(), which is the public API used for
// creating MessagePort instances.
MessagePort(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Object> wrap);

public:
~MessagePort() override;

// Create a new message port instance, optionally over an existing
Expand Down Expand Up @@ -206,6 +210,7 @@ class MessagePort : public HandleWrap {
std::unique_ptr<MessagePortData> data_ = nullptr;
bool receiving_messages_ = false;
uv_async_t async_;
v8::Global<v8::Function> emit_message_fn_;

friend class MessagePortData;
};
Expand Down

0 comments on commit 28cb7e7

Please sign in to comment.