Skip to content

Commit

Permalink
worker: create per-Environment message port after bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
joyeecheung committed Mar 18, 2019
1 parent cf51ee4 commit 470856f
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 38 deletions.
2 changes: 1 addition & 1 deletion lib/internal/bootstrap/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ if (isMainThread) {
setupProcessStdio(getStdout, getStdin, getStderr);
} else {
const { getStdout, getStdin, getStderr } =
workerThreadSetup.initializeWorkerStdio();
workerThreadSetup.createStdioGetters();
setupProcessStdio(getStdout, getStdin, getStderr);
}

Expand Down
29 changes: 11 additions & 18 deletions lib/internal/process/worker_thread_only.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,25 @@

// This file contains process bootstrappers that can only be
// run in the worker thread.
const {
getEnvMessagePort
} = internalBinding('worker');

const {
kWaitingStreams,
ReadableWorkerStdio,
WritableWorkerStdio
createWorkerStdio
} = require('internal/worker/io');

const {
codes: { ERR_WORKER_UNSUPPORTED_OPERATION }
} = require('internal/errors');
const workerStdio = {};

function initializeWorkerStdio() {
const port = getEnvMessagePort();
port[kWaitingStreams] = 0;
workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin');
workerStdio.stdout = new WritableWorkerStdio(port, 'stdout');
workerStdio.stderr = new WritableWorkerStdio(port, 'stderr');

let workerStdio;
function lazyWorkerStdio() {
if (!workerStdio) workerStdio = createWorkerStdio();
return workerStdio;
}
function createStdioGetters() {
return {
getStdout() { return workerStdio.stdout; },
getStderr() { return workerStdio.stderr; },
getStdin() { return workerStdio.stdin; }
getStdout() { return lazyWorkerStdio().stdout; },
getStderr() { return lazyWorkerStdio().stderr; },
getStdin() { return lazyWorkerStdio().stdin; }
};
}

Expand Down Expand Up @@ -55,7 +48,7 @@ function unavailable(name) {
}

module.exports = {
initializeWorkerStdio,
createStdioGetters,
unavailable,
wrapProcessMethods
};
18 changes: 16 additions & 2 deletions lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ const {
moveMessagePortToContext,
stopMessagePort
} = internalBinding('messaging');
const { threadId } = internalBinding('worker');
const {
threadId,
getEnvMessagePort
} = internalBinding('worker');

const { Readable, Writable } = require('stream');
const EventEmitter = require('events');
Expand Down Expand Up @@ -227,6 +230,16 @@ class WritableWorkerStdio extends Writable {
}
}

function createWorkerStdio() {
const port = getEnvMessagePort();
port[kWaitingStreams] = 0;
return {
stdin: new ReadableWorkerStdio(port, 'stdin'),
stdout: new WritableWorkerStdio(port, 'stdout'),
stderr: new WritableWorkerStdio(port, 'stderr')
};
}

module.exports = {
drainMessagePort,
messageTypes,
Expand All @@ -239,5 +252,6 @@ module.exports = {
MessageChannel,
setupPortReferencing,
ReadableWorkerStdio,
WritableWorkerStdio
WritableWorkerStdio,
createWorkerStdio
};
32 changes: 16 additions & 16 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,22 +269,6 @@ void Worker::Run() {
Debug(this, "Created Environment for worker with id %llu", thread_id_);
if (is_stopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
// Set up the message channel for receiving messages in the child.
child_port_ = MessagePort::New(env_.get(),
env_->context(),
std::move(child_port_data_));
// MessagePort::New() may return nullptr if execution is terminated
// within it.
if (child_port_ != nullptr)
env_->set_message_port(child_port_->object(isolate_));

Debug(this, "Created message port for worker %llu", thread_id_);
}

if (is_stopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
std::move(inspector_parent_handle_),
Expand All @@ -296,6 +280,9 @@ void Worker::Run() {
Environment::AsyncCallbackScope callback_scope(env_.get());
env_->async_hooks()->push_async_ids(1, 0);
if (!RunBootstrapping(env_.get()).IsEmpty()) {
CreateEnvMessagePort(env_.get());
if (is_stopped()) return;
Debug(this, "Created message port for worker %llu", thread_id_);
USE(StartExecution(env_.get(), "internal/main/worker_thread"));
}

Expand Down Expand Up @@ -348,6 +335,19 @@ void Worker::Run() {
Debug(this, "Worker %llu thread stops", thread_id_);
}

void Worker::CreateEnvMessagePort(Environment* env) {
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
// Set up the message channel for receiving messages in the child.
child_port_ = MessagePort::New(env,
env->context(),
std::move(child_port_data_));
// MessagePort::New() may return nullptr if execution is terminated
// within it.
if (child_port_ != nullptr)
env->set_message_port(child_port_->object(isolate_));
}

void Worker::JoinThread() {
if (thread_joined_)
return;
Expand Down
2 changes: 1 addition & 1 deletion src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Worker : public AsyncWrap {

private:
void OnThreadStopped();

void CreateEnvMessagePort(Environment* env);
const std::string url_;

std::shared_ptr<PerIsolateOptions> per_isolate_opts_;
Expand Down

0 comments on commit 470856f

Please sign in to comment.