Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast to workers #113

Closed
wants to merge 3 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor to use all the old functions
mariusandra committed Mar 1, 2021
commit 953c8880b1980ade7934463282040243b4d02ce2
119 changes: 47 additions & 72 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -699,7 +699,8 @@ class ThreadPool {
task : any,
transferList : TransferList,
filename : string | null,
abortSignal : AbortSignalAny | null) : Promise<any> {
abortSignal : AbortSignalAny | null,
workerInfo : WorkerInfo | null) : Promise<any> {
if (filename === null) {
filename = this.options.filename;
}
@@ -753,7 +754,7 @@ class ThreadPool {

// If there is a task queue, there's no point in looking for an available
// Worker thread. Add this task to the queue, if possible.
if (this.taskQueue.size > 0) {
if (!workerInfo && this.taskQueue.size > 0) {
const totalCapacity = this.options.maxQueue + this.pendingCapacity();
if (this.taskQueue.size >= totalCapacity) {
if (this.options.maxQueue === 0) {
@@ -771,33 +772,35 @@ class ThreadPool {
return ret;
}

// Look for a Worker with a minimum number of tasks it is currently running.
let workerInfo : WorkerInfo | null = this.workers.findAvailable();

// If we want the ability to abort this task, use only workers that have
// no running tasks.
if (workerInfo !== null && workerInfo.currentUsage() > 0 && abortSignal) {
workerInfo = null;
}
if (!workerInfo) {
// Look for a Worker with a minimum number of tasks it is currently running.
workerInfo = this.workers.findAvailable();

// If no Worker was found, or that Worker was handling another task in some
// way, and we still have the ability to spawn new threads, do so.
let waitingForNewWorker = false;
if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
this.workers.size < this.options.maxThreads) {
this._addNewWorker();
waitingForNewWorker = true;
}
// If we want the ability to abort this task, use only workers that have
// no running tasks.
if (workerInfo !== null && workerInfo.currentUsage() > 0 && abortSignal) {
workerInfo = null;
}

// If no Worker is found, try to put the task into the queue.
if (workerInfo === null) {
if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
return Promise.reject(Errors.NoTaskQueueAvailable());
} else {
this.taskQueue.push(taskInfo);
// If no Worker was found, or that Worker was handling another task in some
// way, and we still have the ability to spawn new threads, do so.
let waitingForNewWorker = false;
if ((workerInfo === null || workerInfo.currentUsage() > 0) &&
this.workers.size < this.options.maxThreads) {
this._addNewWorker();
waitingForNewWorker = true;
}

return ret;
// If no Worker is found, try to put the task into the queue.
if (workerInfo === null) {
if (this.options.maxQueue <= 0 && !waitingForNewWorker) {
return Promise.reject(Errors.NoTaskQueueAvailable());
} else {
this.taskQueue.push(taskInfo);
}

return ret;
}
}

// TODO(addaleax): Clean up the waitTime/runTime recording.
@@ -809,45 +812,6 @@ class ThreadPool {
return ret;
}

broadcastTask (task : any) : Promise<any> {
const filename = this.options.filename;
if (typeof filename !== 'string') {
return Promise.reject(Errors.FilenameNotProvided());
}

const promises = [];
for (const workerInfo of this.workers) {
let resolve : (result : any) => void;
let reject : (err : Error) => void;
// eslint-disable-next-line
const ret = new Promise((res, rej) => { resolve = res; reject = rej; });
const taskInfo = new TaskInfo(
task, undefined, filename, (err : Error | null, result : any) => {
this.completed++;
if (taskInfo.started) {
this.runTime.recordValue(performance.now() - taskInfo.started);
}
if (err !== null) {
reject(err);
} else {
resolve(result);
}
},
null,
this.publicInterface.asyncResource.asyncId());

const now = performance.now();
this.waitTime.recordValue(now - taskInfo.created);
taskInfo.started = now;
workerInfo.postTask(taskInfo);
promises.push(ret);
}

this._maybeDrain();

return Promise.all(promises);
}

pendingCapacity () : number {
return this.workers.pendingItems.size *
this.options.concurrentTasksPerWorker;
@@ -940,12 +904,12 @@ class Piscina extends EventEmitterAsyncResource {
this.#pool = new ThreadPool(this, options);
}

runTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise<any>;
runTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
runTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any>;
runTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise<any>;
runTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny, workerInfo? : WorkerInfo) : Promise<any>;
runTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined, workerInfo? : WorkerInfo) : Promise<any>;
runTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined, workerInfo? : WorkerInfo) : Promise<any>;
runTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined, workerInfo? : WorkerInfo) : Promise<any>;

runTask (task : any, transferList? : any, filename? : any, abortSignal? : any) {
runTask (task : any, transferList? : any, filename? : any, abortSignal? : any, workerInfo? : WorkerInfo) {
// If transferList is a string or AbortSignal, shift it.
if ((typeof transferList === 'object' && !Array.isArray(transferList)) ||
typeof transferList === 'string') {
@@ -972,11 +936,22 @@ class Piscina extends EventEmitterAsyncResource {
new TypeError('abortSignal argument must be an object'));
}
return this.#pool.runTask(
task, transferList, filename || null, abortSignal || null);
task, transferList, filename || null, abortSignal || null, workerInfo || null);
}

broadcastTask (task : any) {
return this.#pool.broadcastTask(task);
broadcastTask (task : any, transferList? : TransferList, filename? : string, abortSignal? : AbortSignalAny) : Promise<any[]>;
broadcastTask (task : any, transferList? : TransferList, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any[]>;
broadcastTask (task : any, transferList? : string, filename? : AbortSignalAny, abortSignal? : undefined) : Promise<any[]>;
broadcastTask (task : any, transferList? : AbortSignalAny, filename? : undefined, abortSignal? : undefined) : Promise<any[]>;

broadcastTask (task : any, transferList? : any, filename? : any, abortSignal? : any) {
const promises = [];

for (const workerInfo of this.#pool.workers) {
promises.push(this.runTask(task, transferList, filename, abortSignal, workerInfo));
}

return Promise.all(promises);
}

destroy () {