-
Notifications
You must be signed in to change notification settings - Fork 30.3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
cluster: scheduler config option for cluster
scheduler let's use a custom scheduler function for scheduling workers.
- Loading branch information
Showing
5 changed files
with
147 additions
and
25 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const assert = require('assert'); | ||
const cluster = require('cluster'); | ||
const http = require('http'); | ||
const net = require('net'); | ||
|
||
if (cluster.isMaster) { | ||
const numWorkers = 2; | ||
const pattern = [2, 1, 2, 2, 1, 2, 1, 1, 2]; | ||
let index = 0; | ||
let readyCount = 0; | ||
|
||
// The scheduler moves through pattern. Each request is scheduled to the | ||
// worker id specified in the current pattern index. | ||
const execute = (workers, socket) => { | ||
const id = pattern[index]; | ||
const worker = workers.get(id); | ||
|
||
if (id === 2) { | ||
assert.strictEqual(scheduler.exposeSocket, true); | ||
assert(socket instanceof net.Socket); | ||
} else { | ||
assert.strictEqual(scheduler.exposeSocket, false); | ||
assert.strictEqual(socket, undefined); | ||
} | ||
|
||
if (worker !== undefined) | ||
index++; | ||
|
||
return worker; | ||
}; | ||
|
||
const scheduler = { execute }; | ||
|
||
// Create a getter for exposeSocket. If the current item in the pattern is 2, | ||
// then expose the socket. Otherwise, hide it. | ||
Object.defineProperty(scheduler, 'exposeSocket', { | ||
get() { return pattern[index] === 2; } | ||
}); | ||
|
||
function onMessage(msg) { | ||
// Once both workers send a 'ready' signal, indicating that their servers | ||
// are listening, begin making HTTP requests. | ||
assert.strictEqual(msg.cmd, 'ready'); | ||
readyCount++; | ||
|
||
if (readyCount === numWorkers) | ||
makeRequest(0, msg.port); | ||
} | ||
|
||
function makeRequest(reqCount, port) { | ||
// Make one request for each element in pattern and then shut down the | ||
// workers. | ||
if (reqCount >= pattern.length) { | ||
for (const id in cluster.workers) | ||
cluster.workers[id].disconnect(); | ||
|
||
return; | ||
} | ||
|
||
http.get({ port }, (res) => { | ||
res.on('data', (data) => { | ||
assert.strictEqual(+data.toString(), pattern[reqCount]); | ||
reqCount++; | ||
makeRequest(reqCount, port); | ||
}); | ||
}); | ||
} | ||
|
||
cluster.setupMaster({ scheduler }); | ||
|
||
for (let i = 0; i < numWorkers; i++) | ||
cluster.fork().on('message', common.mustCall(onMessage)); | ||
|
||
} else { | ||
const server = http.createServer((req, res) => { | ||
res.end(cluster.worker.id + ''); | ||
}).listen(0, () => { | ||
process.send({ cmd: 'ready', port: server.address().port }); | ||
}); | ||
} |