diff --git a/doc/api/errors.md b/doc/api/errors.md index a11a94981fefa6..0b6cebe74f3968 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -3319,6 +3319,17 @@ removed: v10.0.0 Used when a given value is out of the accepted range. + + +### `ERR_CLUSTER_INVALID_SCHEDULER` + + + +Used when scheduler is not a function type. + ### `ERR_VM_MODULE_NOT_LINKED` diff --git a/lib/internal/cluster/primary.js b/lib/internal/cluster/primary.js index ed5b06d798868c..66a2c8cbc1db29 100644 --- a/lib/internal/cluster/primary.js +++ b/lib/internal/cluster/primary.js @@ -26,6 +26,9 @@ const SCHED_RR = 2; const minPort = 1024; const maxPort = 65535; const { validatePort } = require('internal/validators'); +const { + ERR_CLUSTER_INVALID_SCHEDULER +} = require('internal/errors').codes; module.exports = cluster; @@ -42,6 +45,7 @@ cluster.SCHED_RR = SCHED_RR; // Primary distributes connections. let ids = 0; let debugPortOffset = 1; let initialized = false; +let scheduler = RoundRobinHandle.scheduler; // XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings? let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY; @@ -58,6 +62,19 @@ else if (process.platform === 'win32') { cluster.schedulingPolicy = schedulingPolicy; +function validateAndReturnScheduler(scheduler, schedulingPolicy) { + if (scheduler !== undefined) { + if (typeof scheduler.execute !== 'function') { + throw new ERR_CLUSTER_INVALID_SCHEDULER('scheduler.execute'); + } + return scheduler; + } else if (schedulingPolicy === SCHED_RR) { + return { execute: RoundRobinHandle.scheduler }; + } else if (schedulingPolicy !== SCHED_NONE) { + assert(false, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`); + } +} + cluster.setupPrimary = function(options) { const settings = { args: ArrayPrototypeSlice(process.argv, 2), @@ -89,6 +106,7 @@ cluster.setupPrimary = function(options) { assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR, `Bad cluster.schedulingPolicy: ${schedulingPolicy}`); + scheduler = validateAndReturnScheduler(cluster.settings.scheduler, schedulingPolicy); process.nextTick(setupSettingsNT, settings); process.on('internalMessage', (message) => { @@ -310,7 +328,7 @@ function queryServer(worker, message) { message.addressType === 'udp6') { handle = new SharedHandle(key, address, message); } else { - handle = new RoundRobinHandle(key, address, message); + handle = new RoundRobinHandle(key, address, { ...message, scheduler }); } handles.set(key, handle); diff --git a/lib/internal/cluster/round_robin_handle.js b/lib/internal/cluster/round_robin_handle.js index 9d242cc60ad7c1..017cb4cb330191 100644 --- a/lib/internal/cluster/round_robin_handle.js +++ b/lib/internal/cluster/round_robin_handle.js @@ -1,7 +1,6 @@ 'use strict'; const { - ArrayIsArray, Boolean, ObjectCreate, SafeMap, @@ -15,12 +14,12 @@ const { constants } = internalBinding('tcp_wrap'); module.exports = RoundRobinHandle; -function RoundRobinHandle(key, address, { port, fd, flags, backlog }) { +function RoundRobinHandle(key, address, { port, fd, flags, backlog, scheduler }) { this.key = key; - this.all = new SafeMap(); - this.free = new SafeMap(); + this.workers = new SafeMap(); this.handles = init(ObjectCreate(null)); this.handle = null; + this.scheduler = scheduler; this.server = net.createServer(assert.fail); if (fd >= 0) @@ -45,8 +44,8 @@ function RoundRobinHandle(key, address, { port, fd, flags, backlog }) { } RoundRobinHandle.prototype.add = function(worker, send) { - assert(this.all.has(worker.id) === false); - this.all.set(worker.id, worker); + assert(this.workers.has(worker.id) === false); + this.workers.set(worker.id, worker); const done = () => { if (this.handle.getsockname) { @@ -58,7 +57,7 @@ RoundRobinHandle.prototype.add = function(worker, send) { send(null, null, null); // UNIX socket. } - this.handoff(worker); // In case there are connections pending. + this.handoff(); // In case there are connections pending. }; if (this.server === null) @@ -72,14 +71,12 @@ RoundRobinHandle.prototype.add = function(worker, send) { }; RoundRobinHandle.prototype.remove = function(worker) { - const existed = this.all.delete(worker.id); + const existed = this.workers.delete(worker.id); if (!existed) return false; - this.free.delete(worker.id); - - if (this.all.size !== 0) + if (this.workers.size !== 0) return false; while (!isEmpty(this.handles)) { @@ -95,25 +92,37 @@ RoundRobinHandle.prototype.remove = function(worker) { RoundRobinHandle.prototype.distribute = function(err, handle) { append(this.handles, handle); - // eslint-disable-next-line node-core/no-array-destructuring - const [ workerEntry ] = this.free; // this.free is a SafeMap - - if (ArrayIsArray(workerEntry)) { - const { 0: workerId, 1: worker } = workerEntry; - this.free.delete(workerId); - this.handoff(worker); - } + this.handoff(); }; -RoundRobinHandle.prototype.handoff = function(worker) { - if (!this.all.has(worker.id)) { - return; // Worker is closing (or has closed) the server. +RoundRobinHandle.scheduler = function(workers) { + if (workers.size > 0) { + const { 0: workerId, 1: worker } = workers.entries().next().value; + workers.delete(workerId); + workers.set(workerId, worker); + return worker; } +}; +RoundRobinHandle.prototype.handoff = function() { const handle = peek(this.handles); if (handle === null) { - this.free.set(worker.id, worker); // Add to ready queue again. + return; + } + + let socket; + if (this.scheduler.exposeSocket === true) { + socket = new net.Socket({ + handle, + readable: false, + writable: false, + pauseOnCreate: true + }); + } + + const worker = this.scheduler.execute(this.workers, socket); + if (typeof worker === 'undefined') { return; } @@ -127,6 +136,6 @@ RoundRobinHandle.prototype.handoff = function(worker) { else this.distribute(0, handle); // Worker is shutting down. Send to another. - this.handoff(worker); + this.handoff(); }); }; diff --git a/lib/internal/errors.js b/lib/internal/errors.js index 5f75c0290b33d9..fe217512df2d92 100644 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -920,6 +920,7 @@ E('ERR_CHILD_PROCESS_IPC_REQUIRED', Error); E('ERR_CHILD_PROCESS_STDIO_MAXBUFFER', '%s maxBuffer length exceeded', RangeError); +E('ERR_CLUSTER_INVALID_SCHEDULER', '%s is not a valid function', TypeError); E('ERR_CONSOLE_WRITABLE_STREAM', 'Console expects a writable stream instance for %s', TypeError); E('ERR_CONTEXT_NOT_INITIALIZED', 'context used is not initialized', Error); diff --git a/test/parallel/test-cluster-custom-scheduling.js b/test/parallel/test-cluster-custom-scheduling.js new file mode 100644 index 00000000000000..4a7a00ecda9a06 --- /dev/null +++ b/test/parallel/test-cluster-custom-scheduling.js @@ -0,0 +1,83 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const cluster = require('cluster'); +const http = require('http'); +const net = require('net'); + +if (cluster.isMaster) { + const numWorkers = 2; + const pattern = [2, 1, 2, 2, 1, 2, 1, 1, 2]; + let index = 0; + let readyCount = 0; + + // The scheduler moves through pattern. Each request is scheduled to the + // worker id specified in the current pattern index. + const execute = (workers, socket) => { + const id = pattern[index]; + const worker = workers.get(id); + + if (id === 2) { + assert.strictEqual(scheduler.exposeSocket, true); + assert(socket instanceof net.Socket); + } else { + assert.strictEqual(scheduler.exposeSocket, false); + assert.strictEqual(socket, undefined); + } + + if (worker !== undefined) + index++; + + return worker; + }; + + const scheduler = { execute }; + + // Create a getter for exposeSocket. If the current item in the pattern is 2, + // then expose the socket. Otherwise, hide it. + Object.defineProperty(scheduler, 'exposeSocket', { + get() { return pattern[index] === 2; } + }); + + function onMessage(msg) { + // Once both workers send a 'ready' signal, indicating that their servers + // are listening, begin making HTTP requests. + assert.strictEqual(msg.cmd, 'ready'); + readyCount++; + + if (readyCount === numWorkers) + makeRequest(0, msg.port); + } + + function makeRequest(reqCount, port) { + // Make one request for each element in pattern and then shut down the + // workers. + if (reqCount >= pattern.length) { + for (const id in cluster.workers) + cluster.workers[id].disconnect(); + + return; + } + + http.get({ port }, (res) => { + res.on('data', (data) => { + assert.strictEqual(+data.toString(), pattern[reqCount]); + reqCount++; + makeRequest(reqCount, port); + }); + }); + } + + cluster.setupMaster({ scheduler }); + + for (let i = 0; i < numWorkers; i++) + cluster.fork().on('message', common.mustCall(onMessage)); + +} else { + const server = http.createServer((req, res) => { + res.end(cluster.worker.id + ''); + }).listen(0, () => { + process.send({ cmd: 'ready', port: server.address().port }); + }); +}