Skip to content

Commit

Permalink
cluster: refactor to use more primordials
Browse files Browse the repository at this point in the history
PR-URL: #36011
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
aduh95 authored and BethGriggs committed Dec 15, 2020
1 parent c508bfc commit 929e127
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 33 deletions.
30 changes: 18 additions & 12 deletions lib/internal/cluster/child.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
'use strict';

const {
Map,
ArrayPrototypeJoin,
FunctionPrototype,
ObjectAssign,
ReflectApply,
SafeMap,
} = primordials;

const assert = require('internal/assert');
Expand All @@ -12,9 +15,9 @@ const { owner_symbol } = require('internal/async_hooks').symbols;
const Worker = require('internal/cluster/worker');
const { internal, sendHelper } = require('internal/cluster/utils');
const cluster = new EventEmitter();
const handles = new Map();
const indexes = new Map();
const noop = () => {};
const handles = new SafeMap();
const indexes = new SafeMap();
const noop = FunctionPrototype;

module.exports = cluster;

Expand Down Expand Up @@ -49,7 +52,7 @@ cluster._setupWorker = function() {
if (message.act === 'newconn')
onconnection(message, handle);
else if (message.act === 'disconnect')
_disconnect.call(worker, true);
ReflectApply(_disconnect, worker, [true]);
}
};

Expand All @@ -62,10 +65,13 @@ cluster._getServer = function(obj, options, cb) {
process.platform !== 'win32')
address = path.resolve(address);

const indexesKey = [address,
options.port,
options.addressType,
options.fd ].join(':');
const indexesKey = ArrayPrototypeJoin(
[
address,
options.port,
options.addressType,
options.fd,
], ':');

let index = indexes.get(indexesKey);

Expand Down Expand Up @@ -119,7 +125,7 @@ function shared(message, handle, indexesKey, cb) {
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
return close.apply(handle, arguments);
return ReflectApply(close, handle, arguments);
};
assert(handles.has(key) === false);
handles.set(key, handle);
Expand Down Expand Up @@ -228,9 +234,9 @@ function _disconnect(masterInitiated) {

// Extend generic Worker with methods specific to worker processes.
Worker.prototype.disconnect = function() {
if (![ 'disconnecting', 'destroying' ].includes(this.state)) {
if (this.state !== 'disconnecting' && this.state !== 'destroying') {
this.state = 'disconnecting';
_disconnect.call(this);
ReflectApply(_disconnect, this, []);
}

return this;
Expand Down
24 changes: 16 additions & 8 deletions lib/internal/cluster/master.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
'use strict';

const {
Map,
ArrayPrototypePush,
ArrayPrototypeSlice,
ArrayPrototypeSome,
ObjectKeys,
ObjectValues,
RegExpPrototypeTest,
SafeMap,
StringPrototypeStartsWith,
} = primordials;

const assert = require('internal/assert');
Expand All @@ -23,7 +28,7 @@ const { validatePort } = require('internal/validators');

module.exports = cluster;

const handles = new Map();
const handles = new SafeMap();
cluster.isWorker = false;
cluster.isMaster = true;
cluster.Worker = Worker;
Expand Down Expand Up @@ -53,7 +58,7 @@ cluster.schedulingPolicy = schedulingPolicy;

cluster.setupMaster = function(options) {
const settings = {
args: process.argv.slice(2),
args: ArrayPrototypeSlice(process.argv, 2),
exec: process.argv[1],
execArgv: process.execArgv,
silent: false,
Expand All @@ -65,8 +70,10 @@ cluster.setupMaster = function(options) {
// Without --logfile=v8-%p.log, everything ends up in a single, unusable
// file. (Unusable because what V8 logs are memory addresses and each
// process has its own memory mappings.)
if (settings.execArgv.some((s) => s.startsWith('--prof')) &&
!settings.execArgv.some((s) => s.startsWith('--logfile='))) {
if (ArrayPrototypeSome(settings.execArgv,
(s) => StringPrototypeStartsWith(s, '--prof')) &&
!ArrayPrototypeSome(settings.execArgv,
(s) => StringPrototypeStartsWith(s, '--logfile='))) {
settings.execArgv = [...settings.execArgv, '--logfile=v8-%p.log'];
}

Expand Down Expand Up @@ -109,8 +116,9 @@ function createWorkerProcess(id, env) {
const nodeOptions = process.env.NODE_OPTIONS ?
process.env.NODE_OPTIONS : '';

if (execArgv.some((arg) => arg.match(debugArgRegex)) ||
nodeOptions.match(debugArgRegex)) {
if (ArrayPrototypeSome(execArgv,
(arg) => RegExpPrototypeTest(debugArgRegex, arg)) ||
RegExpPrototypeTest(debugArgRegex, nodeOptions)) {
let inspectPort;
if ('inspectPort' in cluster.settings) {
if (typeof cluster.settings.inspectPort === 'function')
Expand All @@ -126,7 +134,7 @@ function createWorkerProcess(id, env) {
debugPortOffset++;
}

execArgv.push(`--inspect-port=${inspectPort}`);
ArrayPrototypePush(execArgv, `--inspect-port=${inspectPort}`);
}

return fork(cluster.settings.exec, cluster.settings.args, {
Expand Down
12 changes: 7 additions & 5 deletions lib/internal/cluster/round_robin_handle.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

const {
ArrayIsArray,
ArrayPrototypePush,
ArrayPrototypeShift,
Boolean,
Map,
SafeMap,
} = primordials;

const assert = require('internal/assert');
Expand All @@ -15,8 +17,8 @@ module.exports = RoundRobinHandle;

function RoundRobinHandle(key, address, { port, fd, flags }) {
this.key = key;
this.all = new Map();
this.free = new Map();
this.all = new SafeMap();
this.free = new SafeMap();
this.handles = [];
this.handle = null;
this.server = net.createServer(assert.fail);
Expand Down Expand Up @@ -90,7 +92,7 @@ RoundRobinHandle.prototype.remove = function(worker) {
};

RoundRobinHandle.prototype.distribute = function(err, handle) {
this.handles.push(handle);
ArrayPrototypePush(this.handles, handle);
const [ workerEntry ] = this.free;

if (ArrayIsArray(workerEntry)) {
Expand All @@ -105,7 +107,7 @@ RoundRobinHandle.prototype.handoff = function(worker) {
return; // Worker is closing (or has closed) the server.
}

const handle = this.handles.shift();
const handle = ArrayPrototypeShift(this.handles);

if (handle === undefined) {
this.free.set(worker.id, worker); // Add to ready queue again.
Expand Down
4 changes: 2 additions & 2 deletions lib/internal/cluster/shared_handle.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
'use strict';
const { Map } = primordials;
const { SafeMap } = primordials;
const assert = require('internal/assert');
const dgram = require('internal/dgram');
const net = require('net');
Expand All @@ -8,7 +8,7 @@ module.exports = SharedHandle;

function SharedHandle(key, address, { port, addressType, fd, flags }) {
this.key = key;
this.workers = new Map();
this.workers = new SafeMap();
this.handle = null;
this.errno = 0;

Expand Down
7 changes: 4 additions & 3 deletions lib/internal/cluster/utils.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
'use strict';

const {
Map,
ReflectApply,
SafeMap,
} = primordials;

module.exports = {
sendHelper,
internal
};

const callbacks = new Map();
const callbacks = new SafeMap();
let seq = 0;

function sendHelper(proc, message, handle, cb) {
Expand Down Expand Up @@ -44,6 +45,6 @@ function internal(worker, cb) {
}
}

fn.apply(worker, arguments);
ReflectApply(fn, worker, arguments);
};
}
7 changes: 4 additions & 3 deletions lib/internal/cluster/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const {
ObjectSetPrototypeOf,
ReflectApply,
} = primordials;

const EventEmitter = require('events');
Expand All @@ -13,7 +14,7 @@ function Worker(options) {
if (!(this instanceof Worker))
return new Worker(options);

EventEmitter.call(this);
ReflectApply(EventEmitter, this, []);

if (options === null || typeof options !== 'object')
options = {};
Expand All @@ -38,11 +39,11 @@ ObjectSetPrototypeOf(Worker.prototype, EventEmitter.prototype);
ObjectSetPrototypeOf(Worker, EventEmitter);

Worker.prototype.kill = function() {
this.destroy.apply(this, arguments);
ReflectApply(this.destroy, this, arguments);
};

Worker.prototype.send = function() {
return this.process.send.apply(this.process, arguments);
return ReflectApply(this.process.send, this.process, arguments);
};

Worker.prototype.isDead = function() {
Expand Down

0 comments on commit 929e127

Please sign in to comment.