Skip to content

Commit

Permalink
cluster: scheduler config option for cluster
Browse files Browse the repository at this point in the history
scheduler let's use a custom scheduler function for scheduling workers.
  • Loading branch information
yashLadha committed Apr 4, 2022
1 parent 0c9273d commit 8328c98
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 25 deletions.
11 changes: 11 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -3319,6 +3319,17 @@ removed: v10.0.0

Used when a given value is out of the accepted range.

<a id="ERR_CLUSTER_INVALID_SCHEDULER"></a>

### `ERR_CLUSTER_INVALID_SCHEDULER`

<!-- YAML
added: v9.0.0
removed: v10.0.0
-->

Used when scheduler is not a function type.

<a id="ERR_VM_MODULE_NOT_LINKED"></a>

### `ERR_VM_MODULE_NOT_LINKED`
Expand Down
20 changes: 19 additions & 1 deletion lib/internal/cluster/primary.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const SCHED_RR = 2;
const minPort = 1024;
const maxPort = 65535;
const { validatePort } = require('internal/validators');
const {
ERR_CLUSTER_INVALID_SCHEDULER
} = require('internal/errors').codes;

module.exports = cluster;

Expand All @@ -42,6 +45,7 @@ cluster.SCHED_RR = SCHED_RR; // Primary distributes connections.
let ids = 0;
let debugPortOffset = 1;
let initialized = false;
let scheduler = RoundRobinHandle.scheduler;

// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY;
Expand All @@ -58,6 +62,19 @@ else if (process.platform === 'win32') {

cluster.schedulingPolicy = schedulingPolicy;

function validateAndReturnScheduler(scheduler, schedulingPolicy) {
if (scheduler !== undefined) {
if (typeof scheduler.execute !== 'function') {
throw new ERR_CLUSTER_INVALID_SCHEDULER('scheduler.execute');
}
return scheduler;
} else if (schedulingPolicy === SCHED_RR) {
return { execute: RoundRobinHandle.scheduler };
} else if (schedulingPolicy !== SCHED_NONE) {
assert(false, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
}
}

cluster.setupPrimary = function(options) {
const settings = {
args: ArrayPrototypeSlice(process.argv, 2),
Expand Down Expand Up @@ -89,6 +106,7 @@ cluster.setupPrimary = function(options) {
assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
`Bad cluster.schedulingPolicy: ${schedulingPolicy}`);

scheduler = validateAndReturnScheduler(cluster.settings.scheduler, schedulingPolicy);
process.nextTick(setupSettingsNT, settings);

process.on('internalMessage', (message) => {
Expand Down Expand Up @@ -310,7 +328,7 @@ function queryServer(worker, message) {
message.addressType === 'udp6') {
handle = new SharedHandle(key, address, message);
} else {
handle = new RoundRobinHandle(key, address, message);
handle = new RoundRobinHandle(key, address, { ...message, scheduler });
}

handles.set(key, handle);
Expand Down
57 changes: 33 additions & 24 deletions lib/internal/cluster/round_robin_handle.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict';

const {
ArrayIsArray,
Boolean,
ObjectCreate,
SafeMap,
Expand All @@ -15,12 +14,12 @@ const { constants } = internalBinding('tcp_wrap');

module.exports = RoundRobinHandle;

function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
function RoundRobinHandle(key, address, { port, fd, flags, backlog, scheduler }) {
this.key = key;
this.all = new SafeMap();
this.free = new SafeMap();
this.workers = new SafeMap();
this.handles = init(ObjectCreate(null));
this.handle = null;
this.scheduler = scheduler;
this.server = net.createServer(assert.fail);

if (fd >= 0)
Expand All @@ -45,8 +44,8 @@ function RoundRobinHandle(key, address, { port, fd, flags, backlog }) {
}

RoundRobinHandle.prototype.add = function(worker, send) {
assert(this.all.has(worker.id) === false);
this.all.set(worker.id, worker);
assert(this.workers.has(worker.id) === false);
this.workers.set(worker.id, worker);

const done = () => {
if (this.handle.getsockname) {
Expand All @@ -58,7 +57,7 @@ RoundRobinHandle.prototype.add = function(worker, send) {
send(null, null, null); // UNIX socket.
}

this.handoff(worker); // In case there are connections pending.
this.handoff(); // In case there are connections pending.
};

if (this.server === null)
Expand All @@ -72,14 +71,12 @@ RoundRobinHandle.prototype.add = function(worker, send) {
};

RoundRobinHandle.prototype.remove = function(worker) {
const existed = this.all.delete(worker.id);
const existed = this.workers.delete(worker.id);

if (!existed)
return false;

this.free.delete(worker.id);

if (this.all.size !== 0)
if (this.workers.size !== 0)
return false;

while (!isEmpty(this.handles)) {
Expand All @@ -95,25 +92,37 @@ RoundRobinHandle.prototype.remove = function(worker) {

RoundRobinHandle.prototype.distribute = function(err, handle) {
append(this.handles, handle);
// eslint-disable-next-line node-core/no-array-destructuring
const [ workerEntry ] = this.free; // this.free is a SafeMap

if (ArrayIsArray(workerEntry)) {
const { 0: workerId, 1: worker } = workerEntry;
this.free.delete(workerId);
this.handoff(worker);
}
this.handoff();
};

RoundRobinHandle.prototype.handoff = function(worker) {
if (!this.all.has(worker.id)) {
return; // Worker is closing (or has closed) the server.
RoundRobinHandle.scheduler = function(workers) {
if (workers.size > 0) {
const { 0: workerId, 1: worker } = workers.entries().next().value;
workers.delete(workerId);
workers.set(workerId, worker);
return worker;
}
};

RoundRobinHandle.prototype.handoff = function() {
const handle = peek(this.handles);

if (handle === null) {
this.free.set(worker.id, worker); // Add to ready queue again.
return;
}

let socket;
if (this.scheduler.exposeSocket === true) {
socket = new net.Socket({
handle,
readable: false,
writable: false,
pauseOnCreate: true
});
}

const worker = this.scheduler.execute(this.workers, socket);
if (typeof worker === 'undefined') {
return;
}

Expand All @@ -127,6 +136,6 @@ RoundRobinHandle.prototype.handoff = function(worker) {
else
this.distribute(0, handle); // Worker is shutting down. Send to another.

this.handoff(worker);
this.handoff();
});
};
1 change: 1 addition & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ E('ERR_CHILD_PROCESS_IPC_REQUIRED',
Error);
E('ERR_CHILD_PROCESS_STDIO_MAXBUFFER', '%s maxBuffer length exceeded',
RangeError);
E('ERR_CLUSTER_INVALID_SCHEDULER', '%s is not a valid function', TypeError);
E('ERR_CONSOLE_WRITABLE_STREAM',
'Console expects a writable stream instance for %s', TypeError);
E('ERR_CONTEXT_NOT_INITIALIZED', 'context used is not initialized', Error);
Expand Down
83 changes: 83 additions & 0 deletions test/parallel/test-cluster-custom-scheduling.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
'use strict';

const common = require('../common');
const assert = require('assert');
const cluster = require('cluster');
const http = require('http');
const net = require('net');

if (cluster.isMaster) {
const numWorkers = 2;
const pattern = [2, 1, 2, 2, 1, 2, 1, 1, 2];
let index = 0;
let readyCount = 0;

// The scheduler moves through pattern. Each request is scheduled to the
// worker id specified in the current pattern index.
const execute = (workers, socket) => {
const id = pattern[index];
const worker = workers.get(id);

if (id === 2) {
assert.strictEqual(scheduler.exposeSocket, true);
assert(socket instanceof net.Socket);
} else {
assert.strictEqual(scheduler.exposeSocket, false);
assert.strictEqual(socket, undefined);
}

if (worker !== undefined)
index++;

return worker;
};

const scheduler = { execute };

// Create a getter for exposeSocket. If the current item in the pattern is 2,
// then expose the socket. Otherwise, hide it.
Object.defineProperty(scheduler, 'exposeSocket', {
get() { return pattern[index] === 2; }
});

function onMessage(msg) {
// Once both workers send a 'ready' signal, indicating that their servers
// are listening, begin making HTTP requests.
assert.strictEqual(msg.cmd, 'ready');
readyCount++;

if (readyCount === numWorkers)
makeRequest(0, msg.port);
}

function makeRequest(reqCount, port) {
// Make one request for each element in pattern and then shut down the
// workers.
if (reqCount >= pattern.length) {
for (const id in cluster.workers)
cluster.workers[id].disconnect();

return;
}

http.get({ port }, (res) => {
res.on('data', (data) => {
assert.strictEqual(+data.toString(), pattern[reqCount]);
reqCount++;
makeRequest(reqCount, port);
});
});
}

cluster.setupMaster({ scheduler });

for (let i = 0; i < numWorkers; i++)
cluster.fork().on('message', common.mustCall(onMessage));

} else {
const server = http.createServer((req, res) => {
res.end(cluster.worker.id + '');
}).listen(0, () => {
process.send({ cmd: 'ready', port: server.address().port });
});
}

0 comments on commit 8328c98

Please sign in to comment.