diff --git a/packages/core/parcel-bundler/package.json b/packages/core/parcel-bundler/package.json index 37cbbcb6402..33a964c05f7 100644 --- a/packages/core/parcel-bundler/package.json +++ b/packages/core/parcel-bundler/package.json @@ -59,7 +59,6 @@ "tomlify-j0.4": "^3.0.0", "uglify-es": "^3.2.1", "v8-compile-cache": "^1.1.0", - "worker-farm": "^1.5.2", "ws": "^4.0.0" }, "devDependencies": { diff --git a/packages/core/parcel-bundler/src/Bundler.js b/packages/core/parcel-bundler/src/Bundler.js index e7fdbb0fe3a..26a0e2fa35f 100644 --- a/packages/core/parcel-bundler/src/Bundler.js +++ b/packages/core/parcel-bundler/src/Bundler.js @@ -1,7 +1,7 @@ const fs = require('./utils/fs'); const Resolver = require('./Resolver'); const Parser = require('./Parser'); -const WorkerFarm = require('./WorkerFarm'); +const WorkerFarm = require('./workerfarm/WorkerFarm'); const Path = require('path'); const Bundle = require('./Bundle'); const {FSWatcher} = require('chokidar'); diff --git a/packages/core/parcel-bundler/src/Logger.js b/packages/core/parcel-bundler/src/Logger.js index d92b5863080..4ea682a4ac2 100644 --- a/packages/core/parcel-bundler/src/Logger.js +++ b/packages/core/parcel-bundler/src/Logger.js @@ -195,15 +195,19 @@ function stringWidth(string) { // If we are in a worker, make a proxy class which will // send the logger calls to the main process via IPC. // These are handled in WorkerFarm and directed to handleMessage above. -if (process.send) { +if (process.send && process.env.WORKER_TYPE === 'parcel-worker') { + const worker = require('./worker'); class LoggerProxy {} for (let method of Object.getOwnPropertyNames(Logger.prototype)) { LoggerProxy.prototype[method] = (...args) => { - process.send({ - type: 'logger', - method, - args - }); + worker.addCall( + { + location: require.resolve('./Logger'), + method, + args + }, + false + ); }; } diff --git a/packages/core/parcel-bundler/src/WorkerFarm.js b/packages/core/parcel-bundler/src/WorkerFarm.js deleted file mode 100644 index bf5bbb19b33..00000000000 --- a/packages/core/parcel-bundler/src/WorkerFarm.js +++ /dev/null @@ -1,141 +0,0 @@ -const {EventEmitter} = require('events'); -const os = require('os'); -const Farm = require('worker-farm/lib/farm'); -const promisify = require('./utils/promisify'); -const logger = require('./Logger'); - -let shared = null; - -class WorkerFarm extends Farm { - constructor(options) { - let opts = { - maxConcurrentWorkers: getNumWorkers() - }; - - let workerPath = - parseInt(process.versions.node, 10) < 8 - ? require.resolve('../lib/worker') - : require.resolve('../src/worker'); - - super(opts, workerPath); - - this.localWorker = this.promisifyWorker(require('./worker')); - this.remoteWorker = this.promisifyWorker(this.setup(['init', 'run'])); - - this.started = false; - this.warmWorkers = 0; - this.init(options); - } - - init(options) { - this.localWorker.init(options); - this.initRemoteWorkers(options); - } - - promisifyWorker(worker) { - let res = {}; - - for (let key in worker) { - res[key] = promisify(worker[key].bind(worker)); - } - - return res; - } - - async initRemoteWorkers(options) { - this.started = false; - this.warmWorkers = 0; - - let promises = []; - for (let i = 0; i < this.options.maxConcurrentWorkers; i++) { - promises.push(this.remoteWorker.init(options)); - } - - await Promise.all(promises); - if (this.options.maxConcurrentWorkers > 0) { - this.started = true; - } - } - - receive(data) { - if (data.event) { - this.emit(data.event, ...data.args); - } else if (data.type === 'logger') { - if (this.shouldUseRemoteWorkers()) { - logger.handleMessage(data); - } - } else if (this.children[data.child]) { - super.receive(data); - } - } - - shouldUseRemoteWorkers() { - return this.started && this.warmWorkers >= this.activeChildren; - } - - async run(...args) { - // Child process workers are slow to start (~600ms). - // While we're waiting, just run on the main thread. - // This significantly speeds up startup time. - if (this.shouldUseRemoteWorkers()) { - return this.remoteWorker.run(...args, false); - } else { - // Workers have started, but are not warmed up yet. - // Send the job to a remote worker in the background, - // but use the result from the local worker - it will be faster. - if (this.started) { - this.remoteWorker.run(...args, true).then( - () => { - this.warmWorkers++; - }, - () => { - // ignore error - } - ); - } - - return this.localWorker.run(...args, false); - } - } - - end() { - // Force kill all children - this.ending = true; - for (let child in this.children) { - this.stopChild(child); - } - - this.ending = false; - shared = null; - } - - static getShared(options) { - if (!shared) { - shared = new WorkerFarm(options); - } else { - shared.init(options); - } - - return shared; - } -} - -for (let key in EventEmitter.prototype) { - WorkerFarm.prototype[key] = EventEmitter.prototype[key]; -} - -function getNumWorkers() { - if (process.env.PARCEL_WORKERS) { - return parseInt(process.env.PARCEL_WORKERS, 10); - } - - let cores; - try { - cores = require('physical-cpu-count'); - } catch (err) { - cores = os.cpus().length; - } - return cores || 1; -} - -module.exports = WorkerFarm; diff --git a/packages/core/parcel-bundler/src/utils/localRequire.js b/packages/core/parcel-bundler/src/utils/localRequire.js index 9744ed1a371..4d8b9a0bc9c 100644 --- a/packages/core/parcel-bundler/src/utils/localRequire.js +++ b/packages/core/parcel-bundler/src/utils/localRequire.js @@ -1,6 +1,6 @@ const {dirname} = require('path'); const resolve = require('resolve'); -const install = require('./installPackage'); +const worker = require('../worker'); const cache = new Map(); @@ -13,7 +13,10 @@ async function localRequire(name, path, triedInstall = false) { resolved = resolve.sync(name, {basedir}); } catch (e) { if (e.code === 'MODULE_NOT_FOUND' && !triedInstall) { - await install([name], path); + await worker.addCall({ + location: require.resolve('./installPackage.js'), + args: [[name], path] + }); return localRequire(name, path, true); } throw e; diff --git a/packages/core/parcel-bundler/src/worker.js b/packages/core/parcel-bundler/src/worker.js index 6a5e8d069d4..f763f36d1ed 100644 --- a/packages/core/parcel-bundler/src/worker.js +++ b/packages/core/parcel-bundler/src/worker.js @@ -1,32 +1,39 @@ require('v8-compile-cache'); const Pipeline = require('./Pipeline'); +const child = require('./workerfarm/child'); +const WorkerFarm = require('./workerfarm/WorkerFarm'); let pipeline; -exports.init = function(options, callback) { +function init(options, isLocal = false) { pipeline = new Pipeline(options || {}); Object.assign(process.env, options.env || {}); process.env.HMR_PORT = options.hmrPort; process.env.HMR_HOSTNAME = options.hmrHostname; - callback(); -}; + if (isLocal) { + process.env.WORKER_TYPE = 'parcel-worker'; + } +} -exports.run = async function(path, pkg, options, isWarmUp, callback) { +async function run(path, pkg, options, isWarmUp) { try { options.isWarmUp = isWarmUp; - var result = await pipeline.process(path, pkg, options); - - callback(null, result); - } catch (err) { - let returned = err; - returned.fileName = path; - callback(returned); + return await pipeline.process(path, pkg, options); + } catch (e) { + e.fileName = path; + throw e; } -}; +} -process.on('unhandledRejection', function(err) { - // ERR_IPC_CHANNEL_CLOSED happens when the worker is killed before it finishes processing - if (err.code !== 'ERR_IPC_CHANNEL_CLOSED') { - console.error('Unhandled promise rejection:', err.stack); +// request.location is a module path relative to src or lib +async function addCall(request, awaitResponse = true) { + if (process.send && process.env.WORKER_TYPE === 'parcel-worker') { + return child.addCall(request, awaitResponse); + } else { + return WorkerFarm.getShared().processRequest(request); } -}); +} + +exports.init = init; +exports.run = run; +exports.addCall = addCall; diff --git a/packages/core/parcel-bundler/src/workerfarm/Worker.js b/packages/core/parcel-bundler/src/workerfarm/Worker.js new file mode 100644 index 00000000000..154c2a37790 --- /dev/null +++ b/packages/core/parcel-bundler/src/workerfarm/Worker.js @@ -0,0 +1,142 @@ +const childProcess = require('child_process'); +const {EventEmitter} = require('events'); +const errorUtils = require('./errorUtils'); + +const childModule = + parseInt(process.versions.node, 10) < 8 + ? require.resolve('../../lib/workerfarm/child') + : require.resolve('../../src/workerfarm/child'); + +let WORKER_ID = 0; + +class Worker extends EventEmitter { + constructor(forkModule, options) { + super(); + + this.options = options; + this.id = WORKER_ID++; + + this.sendQueue = []; + this.processQueue = true; + + this.calls = new Map(); + this.exitCode = null; + this.callId = 0; + this.stopped = false; + + this.fork(forkModule); + } + + fork(forkModule) { + let filteredArgs = process.execArgv.filter( + v => !/^--(debug|inspect)/.test(v) + ); + + let options = { + execArgv: filteredArgs, + env: process.env, + cwd: process.cwd() + }; + + this.child = childProcess.fork(childModule, process.argv, options); + this.send({ + type: 'module', + module: forkModule, + child: this.id + }); + + this.child.on('message', this.receive.bind(this)); + + this.child.once('exit', code => { + this.exitCode = code; + this.emit('exit', code); + }); + + this.child.on('error', err => { + this.emit('error', err); + }); + } + + send(data) { + if (!this.processQueue) { + return this.sendQueue.push(data); + } + + let result = this.child.send(data, error => { + if (error && error instanceof Error) { + // Ignore this, the workerfarm handles child errors + return; + } + + this.processQueue = true; + + if (this.sendQueue.length > 0) { + let queueCopy = this.sendQueue.slice(0); + this.sendQueue = []; + queueCopy.forEach(entry => this.send(entry)); + } + }); + + if (!result || /^win/.test(process.platform)) { + // Queue is handling too much messages throttle it + this.processQueue = false; + } + } + + call(call) { + let idx = this.callId++; + this.calls.set(idx, call); + + this.send({ + type: 'request', + idx: idx, + child: this.id, + method: call.method, + args: call.args + }); + } + + receive(data) { + if (this.stopped) { + return; + } + + let idx = data.idx; + let type = data.type; + let content = data.content; + let contentType = data.contentType; + + if (type === 'request') { + this.emit('request', data); + } else if (type === 'response') { + let call = this.calls.get(idx); + if (!call) { + throw new Error( + `Worker Farm: Received message for unknown index for existing child. This should not happen!` + ); + } + + if (contentType === 'error') { + call.reject(errorUtils.jsonToError(content)); + } else { + call.resolve(content); + } + + this.calls.delete(idx); + this.emit('response', data); + } + } + + stop() { + this.stopped = true; + + this.send('die'); + setTimeout(() => { + if (this.exitCode === null) { + this.child.kill('SIGKILL'); + } + }, this.options.forcedKillTime); + } +} + +module.exports = Worker; diff --git a/packages/core/parcel-bundler/src/workerfarm/WorkerFarm.js b/packages/core/parcel-bundler/src/workerfarm/WorkerFarm.js new file mode 100644 index 00000000000..229ce70ffc6 --- /dev/null +++ b/packages/core/parcel-bundler/src/workerfarm/WorkerFarm.js @@ -0,0 +1,284 @@ +const {EventEmitter} = require('events'); +const os = require('os'); +const errorUtils = require('./errorUtils'); +const Worker = require('./Worker'); + +let shared = null; +class WorkerFarm extends EventEmitter { + constructor(options, farmOptions = {}) { + super(); + this.options = Object.assign( + { + maxConcurrentWorkers: WorkerFarm.getNumWorkers(), + maxConcurrentCallsPerWorker: 10, + forcedKillTime: 100, + warmWorkers: true, + useLocalWorker: true, + workerPath: '../worker' + }, + farmOptions + ); + + this.started = false; + this.warmWorkers = 0; + this.children = new Map(); + this.callQueue = []; + + this.localWorker = require(this.options.workerPath); + this.remoteWorker = { + run: this.mkhandle('run') + }; + + this.init(options); + } + + mkhandle(method) { + return function(...args) { + return new Promise((resolve, reject) => { + this.addCall({ + method, + args: args, + retries: 0, + resolve, + reject + }); + }); + }.bind(this); + } + + onError(error, childId) { + // Handle ipc errors + if (error.code === 'ERR_IPC_CHANNEL_CLOSED') { + return this.stopChild(childId); + } + } + + onExit(childId) { + // delay this to give any sends a chance to finish + setTimeout(() => { + let doQueue = false; + let child = this.children.get(childId); + if (child && child.calls.size) { + for (let call of child.calls.values()) { + call.retries++; + this.callQueue.unshift(call); + doQueue = true; + } + } + this.stopChild(childId); + if (doQueue) { + this.processQueue(); + } + }, 10); + } + + startChild() { + let worker = new Worker(this.options.workerPath, this.options); + + worker.on('request', data => { + this.processRequest(data, worker); + }); + + worker.on('response', () => { + // allow any outstanding calls to be processed + this.processQueue(); + }); + + worker.once('exit', () => { + this.onExit(worker.id); + }); + + worker.on('error', err => { + this.onError(err, worker.id); + }); + + this.children.set(worker.id, worker); + } + + stopChild(childId) { + let child = this.children.get(childId); + if (child) { + child.stop(); + this.children.delete(childId); + } + } + + async processQueue() { + if (this.ending || !this.callQueue.length) return; + + if (this.children.size < this.options.maxConcurrentWorkers) { + this.startChild(); + } + + for (let child of this.children.values()) { + if (!this.callQueue.length) { + break; + } + + if (child.calls.size < this.options.maxConcurrentCallsPerWorker) { + child.call(this.callQueue.shift()); + } + } + } + + async processRequest(data, child = false) { + let result = { + idx: data.idx, + type: 'response' + }; + + let method = data.method; + let args = data.args; + let location = data.location; + let awaitResponse = data.awaitResponse; + + if (!location) { + throw new Error('Unknown request'); + } + + const mod = require(location); + try { + let func; + if (method) { + func = mod[method]; + } else { + func = mod; + } + result.contentType = 'data'; + result.content = await func(...args); + } catch (e) { + result.contentType = 'error'; + result.content = errorUtils.errorToJson(e); + } + + if (awaitResponse) { + if (child) { + child.send(result); + } else { + return result; + } + } + } + + addCall(call) { + if (this.ending) return; // don't add anything new to the queue + this.callQueue.push(call); + this.processQueue(); + } + + async end() { + this.ending = true; + for (let childId of this.children.keys()) { + this.stopChild(childId); + } + this.ending = false; + shared = null; + } + + init(options) { + this.localWorker.init(options, true); + this.initRemoteWorkers(options); + } + + async initRemoteWorkers(options) { + this.started = false; + this.warmWorkers = 0; + + // Start workers if there isn't enough workers already + for ( + let i = this.children.size; + i < this.options.maxConcurrentWorkers; + i++ + ) { + this.startChild(); + } + + // Reliable way of initialising workers + let promises = []; + for (let child of this.children.values()) { + promises.push( + new Promise((resolve, reject) => { + child.call({ + method: 'init', + args: [options], + retries: 0, + resolve, + reject + }); + }) + ); + } + + await Promise.all(promises); + if (this.options.maxConcurrentWorkers > 0) { + this.started = true; + this.emit('started'); + } + } + + shouldUseRemoteWorkers() { + return ( + !this.options.useLocalWorker || + (this.started && + (this.warmWorkers >= this.children.size || !this.options.warmWorkers)) + ); + } + + async warmupWorker(...args) { + // Workers have started, but are not warmed up yet. + // Send the job to a remote worker in the background, + // but use the result from the local worker - it will be faster. + if (this.started) { + this.remoteWorker + .run(...args, true) + .then(() => { + this.warmWorkers++; + if (this.warmWorkers >= this.children.size) { + this.emit('warmedup'); + } + }) + .catch(() => {}); + } + } + + async run(...args) { + // Child process workers are slow to start (~600ms). + // While we're waiting, just run on the main thread. + // This significantly speeds up startup time. + if (this.shouldUseRemoteWorkers()) { + return this.remoteWorker.run(...args, false); + } else { + if (this.options.warmWorkers) { + this.warmupWorker(...args); + } + + return this.localWorker.run(...args, false); + } + } + + static getShared(options) { + if (!shared) { + shared = new WorkerFarm(options); + } else if (options) { + shared.init(options); + } + + return shared; + } + + static getNumWorkers() { + if (process.env.PARCEL_WORKERS) { + return parseInt(process.env.PARCEL_WORKERS, 10); + } + + let cores; + try { + cores = require('physical-cpu-count'); + } catch (err) { + cores = os.cpus().length; + } + return cores || 1; + } +} + +module.exports = WorkerFarm; diff --git a/packages/core/parcel-bundler/src/workerfarm/child.js b/packages/core/parcel-bundler/src/workerfarm/child.js new file mode 100644 index 00000000000..1d90afdd7e0 --- /dev/null +++ b/packages/core/parcel-bundler/src/workerfarm/child.js @@ -0,0 +1,140 @@ +const errorUtils = require('./errorUtils'); + +class Child { + constructor() { + this.module = undefined; + this.childId = undefined; + + this.callQueue = []; + this.responseQueue = new Map(); + this.responseId = 0; + this.maxConcurrentCalls = 10; + } + + messageListener(data) { + if (data === 'die') { + return this.end(); + } + + if (data.type === 'module' && data.module && !this.module) { + this.module = require(data.module); + this.childId = data.child; + if (this.module.setChildReference) { + this.module.setChildReference(this); + } + return; + } + + let type = data.type; + if (type === 'response') { + return this.handleResponse(data); + } else if (type === 'request') { + return this.handleRequest(data); + } + } + + async send(data) { + process.send(data, err => { + if (err && err instanceof Error) { + if (err.code === 'ERR_IPC_CHANNEL_CLOSED') { + // IPC connection closed + // no need to keep the worker running if it can't send or receive data + return this.end(); + } + } + }); + } + + async handleRequest(data) { + let idx = data.idx; + let child = data.child; + let method = data.method; + let args = data.args; + + let result = {idx, child, type: 'response'}; + try { + result.contentType = 'data'; + result.content = await this.module[method](...args); + } catch (e) { + result.contentType = 'error'; + result.content = errorUtils.errorToJson(e); + } + + this.send(result); + } + + async handleResponse(data) { + let idx = data.idx; + let contentType = data.contentType; + let content = data.content; + let call = this.responseQueue.get(idx); + + if (contentType === 'error') { + call.reject(errorUtils.jsonToError(content)); + } else { + call.resolve(content); + } + + this.responseQueue.delete(idx); + + // Process the next call + this.processQueue(); + } + + // Keep in mind to make sure responses to these calls are JSON.Stringify safe + async addCall(request, awaitResponse = true) { + let call = request; + call.type = 'request'; + call.child = this.childId; + call.awaitResponse = awaitResponse; + + let promise; + if (awaitResponse) { + promise = new Promise((resolve, reject) => { + call.resolve = resolve; + call.reject = reject; + }); + } + + this.callQueue.push(call); + this.processQueue(); + + return promise; + } + + async sendRequest(call) { + let idx; + if (call.awaitResponse) { + idx = this.responseId++; + this.responseQueue.set(idx, call); + } + this.send({ + idx: idx, + child: call.child, + type: call.type, + location: call.location, + method: call.method, + args: call.args, + awaitResponse: call.awaitResponse + }); + } + + async processQueue() { + if (!this.callQueue.length) { + return; + } + + if (this.responseQueue.size < this.maxConcurrentCalls) { + this.sendRequest(this.callQueue.shift()); + } + } + + end() { + return process.exit(0); + } +} + +let child = new Child(); +process.on('message', child.messageListener.bind(child)); + +module.exports = child; diff --git a/packages/core/parcel-bundler/src/workerfarm/errorUtils.js b/packages/core/parcel-bundler/src/workerfarm/errorUtils.js new file mode 100644 index 00000000000..fd7de98d092 --- /dev/null +++ b/packages/core/parcel-bundler/src/workerfarm/errorUtils.js @@ -0,0 +1,23 @@ +function errorToJson(error) { + let jsonError = { + message: error.message, + stack: error.stack, + name: error.name + }; + // Add all custom codeFrame properties + Object.keys(error).forEach(key => { + jsonError[key] = error[key]; + }); + return jsonError; +} + +function jsonToError(json) { + let error = new Error(json.message); + Object.keys(json).forEach(key => { + error[key] = json[key]; + }); + return error; +} + +exports.errorToJson = errorToJson; +exports.jsonToError = jsonToError; diff --git a/packages/core/parcel-bundler/test/integration/workerfarm/echo.js b/packages/core/parcel-bundler/test/integration/workerfarm/echo.js new file mode 100644 index 00000000000..af05f5c87f2 --- /dev/null +++ b/packages/core/parcel-bundler/test/integration/workerfarm/echo.js @@ -0,0 +1,10 @@ +function run(data) { + return data; +} + +function init() { + // do nothing +} + +exports.run = run; +exports.init = init; \ No newline at end of file diff --git a/packages/core/parcel-bundler/test/integration/workerfarm/init.js b/packages/core/parcel-bundler/test/integration/workerfarm/init.js new file mode 100644 index 00000000000..444d8f5d744 --- /dev/null +++ b/packages/core/parcel-bundler/test/integration/workerfarm/init.js @@ -0,0 +1,12 @@ +let options = {}; + +function run() { + return options; +} + +function init(opt) { + options = opt; +} + +exports.run = run; +exports.init = init; \ No newline at end of file diff --git a/packages/core/parcel-bundler/test/integration/workerfarm/ipc-pid.js b/packages/core/parcel-bundler/test/integration/workerfarm/ipc-pid.js new file mode 100644 index 00000000000..43df6a98dc2 --- /dev/null +++ b/packages/core/parcel-bundler/test/integration/workerfarm/ipc-pid.js @@ -0,0 +1,27 @@ +let options = {}; +let child; + +function setChildReference(childRef) { + child = childRef; +} + +function run() { + let result = [process.pid]; + return new Promise((resolve, reject) => { + child.addCall({ + location: require.resolve('./master-process-id.js'), + args: [] + }).then((pid) => { + result.push(pid) + resolve(result); + }).catch(reject); + }); +} + +function init() { + // Do nothing +} + +exports.run = run; +exports.init = init; +exports.setChildReference = setChildReference; \ No newline at end of file diff --git a/packages/core/parcel-bundler/test/integration/workerfarm/ipc.js b/packages/core/parcel-bundler/test/integration/workerfarm/ipc.js new file mode 100644 index 00000000000..57041cad4fc --- /dev/null +++ b/packages/core/parcel-bundler/test/integration/workerfarm/ipc.js @@ -0,0 +1,23 @@ +let options = {}; +let child; + +function setChildReference(childRef) { + child = childRef; +} + +function run(a, b) { + return new Promise((resolve, reject) => { + child.addCall({ + location: require.resolve('./master-sum.js'), + args: [a, b] + }).then(resolve).catch(reject); + }); +} + +function init() { + // Do nothing +} + +exports.run = run; +exports.init = init; +exports.setChildReference = setChildReference; \ No newline at end of file diff --git a/packages/core/parcel-bundler/test/integration/workerfarm/master-process-id.js b/packages/core/parcel-bundler/test/integration/workerfarm/master-process-id.js new file mode 100644 index 00000000000..b6e286d04ca --- /dev/null +++ b/packages/core/parcel-bundler/test/integration/workerfarm/master-process-id.js @@ -0,0 +1,3 @@ +module.exports = function() { + return process.pid; +} \ No newline at end of file diff --git a/packages/core/parcel-bundler/test/integration/workerfarm/master-sum.js b/packages/core/parcel-bundler/test/integration/workerfarm/master-sum.js new file mode 100644 index 00000000000..f8ffa8f0259 --- /dev/null +++ b/packages/core/parcel-bundler/test/integration/workerfarm/master-sum.js @@ -0,0 +1,3 @@ +module.exports = function(a, b) { + return a + b; +} \ No newline at end of file diff --git a/packages/core/parcel-bundler/test/integration/workerfarm/ping.js b/packages/core/parcel-bundler/test/integration/workerfarm/ping.js new file mode 100644 index 00000000000..da73497f7ff --- /dev/null +++ b/packages/core/parcel-bundler/test/integration/workerfarm/ping.js @@ -0,0 +1,10 @@ +function run() { + return 'pong'; +} + +function init() { + // do nothing +} + +exports.run = run; +exports.init = init; \ No newline at end of file diff --git a/packages/core/parcel-bundler/test/mocha.opts b/packages/core/parcel-bundler/test/mocha.opts index 9a359cfdf21..7930d9635a7 100644 --- a/packages/core/parcel-bundler/test/mocha.opts +++ b/packages/core/parcel-bundler/test/mocha.opts @@ -1,2 +1,2 @@ ---timeout 25000 +--timeout 50000 --require ./test/babel-register diff --git a/packages/core/parcel-bundler/test/workerfarm.js b/packages/core/parcel-bundler/test/workerfarm.js new file mode 100644 index 00000000000..001249b0219 --- /dev/null +++ b/packages/core/parcel-bundler/test/workerfarm.js @@ -0,0 +1,192 @@ +const assert = require('assert'); +const WorkerFarm = require('../src/workerfarm/WorkerFarm'); + +describe('WorkerFarm', () => { + it('Should start up workers', async () => { + let workerfarm = new WorkerFarm( + {}, + { + warmWorkers: false, + useLocalWorker: false, + workerPath: require.resolve('./integration/workerfarm/ping.js') + } + ); + + await new Promise(resolve => workerfarm.once('started', resolve)); + + assert.equal(await workerfarm.run(), 'pong'); + + await workerfarm.end(); + }); + + it('Should handle 1000 requests without any issue', async () => { + let workerfarm = new WorkerFarm( + {}, + { + warmWorkers: false, + useLocalWorker: false, + workerPath: require.resolve('./integration/workerfarm/echo.js') + } + ); + + await new Promise(resolve => workerfarm.once('started', resolve)); + + let promises = []; + for (let i = 0; i < 1000; i++) { + promises.push(workerfarm.run(i)); + } + await Promise.all(promises); + + await workerfarm.end(); + }); + + it('Should consistently initialise workers, even after 100 re-inits', async () => { + let options = { + key: 0 + }; + + let workerfarm = new WorkerFarm(options, { + warmWorkers: false, + useLocalWorker: false, + workerPath: require.resolve('./integration/workerfarm/init.js') + }); + + for (let i = 0; i < 100; i++) { + options.key = i; + workerfarm.init(options); + await new Promise(resolve => workerfarm.once('started', resolve)); + for (let i = 0; i < workerfarm.children.size; i++) { + assert.equal((await workerfarm.run()).key, options.key); + } + assert.equal(workerfarm.shouldUseRemoteWorkers(), true); + } + + await workerfarm.end(); + }); + + it('Should warm up workers', async () => { + let workerfarm = new WorkerFarm( + {}, + { + warmWorkers: true, + useLocalWorker: true, + workerPath: require.resolve('./integration/workerfarm/echo.js') + } + ); + + await new Promise(resolve => workerfarm.once('started', resolve)); + + for (let i = 0; i < 100; i++) { + assert.equal(await workerfarm.run(i), i); + } + + await new Promise(resolve => workerfarm.once('warmedup', resolve)); + + assert(workerfarm.children.size > 0, 'Should have spawned workers.'); + assert( + workerfarm.warmWorkers >= workerfarm.children.size, + 'Should have warmed up workers.' + ); + + await workerfarm.end(); + }); + + it('Should use the local worker', async () => { + let workerfarm = new WorkerFarm( + {}, + { + warmWorkers: true, + useLocalWorker: true, + workerPath: require.resolve('./integration/workerfarm/echo.js') + } + ); + + assert.equal(await workerfarm.run('hello world'), 'hello world'); + assert.equal(workerfarm.shouldUseRemoteWorkers(), false); + + await workerfarm.end(); + }); + + it('Should be able to use bi-directional communication', async () => { + let workerfarm = new WorkerFarm( + {}, + { + warmWorkers: false, + useLocalWorker: false, + workerPath: require.resolve('./integration/workerfarm/ipc.js') + } + ); + + await new Promise(resolve => workerfarm.once('started', resolve)); + + assert.equal(await workerfarm.run(1, 2), 3); + + await workerfarm.end(); + }); + + it('Should be able to handle 1000 bi-directional calls', async () => { + let workerfarm = new WorkerFarm( + {}, + { + warmWorkers: false, + useLocalWorker: false, + workerPath: require.resolve('./integration/workerfarm/ipc.js') + } + ); + + await new Promise(resolve => workerfarm.once('started', resolve)); + + for (let i = 0; i < 1000; i++) { + assert.equal(await workerfarm.run(1 + i, 2), 3 + i); + } + + await workerfarm.end(); + }); + + it('Bi-directional call should return masters pid', async () => { + let workerfarm = new WorkerFarm( + {}, + { + warmWorkers: false, + useLocalWorker: false, + workerPath: require.resolve('./integration/workerfarm/ipc-pid.js') + } + ); + + await new Promise(resolve => workerfarm.once('started', resolve)); + + let result = await workerfarm.run(); + assert.equal(result.length, 2); + assert.equal(result[1], process.pid); + assert.notEqual(result[0], process.pid); + + await workerfarm.end(); + }); + + it('Should handle 10 big concurrent requests without any issue', async () => { + // This emulates the node.js ipc bug for win32 + let workerfarm = new WorkerFarm( + {}, + { + warmWorkers: false, + useLocalWorker: false, + workerPath: require.resolve('./integration/workerfarm/echo.js') + } + ); + + await new Promise(resolve => workerfarm.once('started', resolve)); + + let bigData = []; + for (let i = 0; i < 10000; i++) { + bigData.push('This is some big data'); + } + + let promises = []; + for (let i = 0; i < 10; i++) { + promises.push(workerfarm.run(bigData)); + } + await Promise.all(promises); + + await workerfarm.end(); + }); +}); diff --git a/yarn.lock b/yarn.lock index 23065d49ea9..02f1e7aa27d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1973,7 +1973,7 @@ entities@^1.1.1, entities@~1.1.1: version "1.1.1" resolved "https://registry.yarnpkg.com/entities/-/entities-1.1.1.tgz#6e5c2d0a5621b5dadaecef80b90edfb5cd7772f0" -errno@^0.1.1, errno@~0.1.7: +errno@^0.1.1: version "0.1.7" resolved "https://registry.yarnpkg.com/errno/-/errno-0.1.7.tgz#4684d71779ad39af177e3f007996f7c67c852618" dependencies: @@ -5658,9 +5658,9 @@ resolve@^0.6.1: version "0.6.3" resolved "https://registry.yarnpkg.com/resolve/-/resolve-0.6.3.tgz#dd957982e7e736debdf53b58a4dd91754575dd46" -resolve@^1.0.0, resolve@^1.1.6: - version "1.6.0" - resolved "https://registry.yarnpkg.com/resolve/-/resolve-1.6.0.tgz#0fbd21278b27b4004481c395349e7aba60a9ff5c" +resolve@^1.0.0: + version "1.7.1" + resolved "https://registry.yarnpkg.com/resolve/-/resolve-1.7.1.tgz#aadd656374fd298aee895bc026b8297418677fd3" dependencies: path-parse "^1.0.5" @@ -5670,6 +5670,12 @@ resolve@^1.1.5, resolve@^1.4.0: dependencies: path-parse "^1.0.5" +resolve@^1.1.6: + version "1.6.0" + resolved "https://registry.yarnpkg.com/resolve/-/resolve-1.6.0.tgz#0fbd21278b27b4004481c395349e7aba60a9ff5c" + dependencies: + path-parse "^1.0.5" + restore-cursor@^1.0.1: version "1.0.1" resolved "https://registry.yarnpkg.com/restore-cursor/-/restore-cursor-1.0.1.tgz#34661f46886327fed2991479152252df92daa541" @@ -6694,12 +6700,6 @@ wordwrap@~1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/wordwrap/-/wordwrap-1.0.0.tgz#27584810891456a4171c8d0226441ade90cbcaeb" -worker-farm@^1.5.2: - version "1.6.0" - resolved "https://registry.yarnpkg.com/worker-farm/-/worker-farm-1.6.0.tgz#aecc405976fab5a95526180846f0dba288f3a4a0" - dependencies: - errno "~0.1.7" - wrap-ansi@^2.0.0: version "2.1.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-2.1.0.tgz#d8fc3d284dd05794fe84973caecdd1cf824fdd85"