From c4f67234cff338b05a1e8eb0db6e8ae7613bdb4f Mon Sep 17 00:00:00 2001 From: Mark Wubben Date: Mon, 1 Nov 2021 14:22:41 +0100 Subject: [PATCH] Use thread IDs --- lib/fork.js | 8 +------- lib/plugin-support/shared-worker-loader.js | 4 ++-- lib/plugin-support/shared-workers.js | 10 +++------- lib/worker/channel.cjs | 6 ++---- 4 files changed, 8 insertions(+), 20 deletions(-) diff --git a/lib/fork.js b/lib/fork.js index dce4f9331..17fce418e 100644 --- a/lib/fork.js +++ b/lib/fork.js @@ -14,8 +14,6 @@ export function _testOnlyReplaceWorkerPath(replacement) { workerPath = replacement; } -let forkCounter = 0; - const additionalExecArgv = ['--enable-source-maps']; const createWorker = (options, execArgv) => { @@ -68,9 +66,6 @@ const createWorker = (options, execArgv) => { }; export default function loadFork(file, options, execArgv = process.execArgv) { - // TODO: this can be changed to use `threadId` when using worker_threads - const forkId = `fork/${++forkCounter}`; - let finished = false; const emitter = new Emittery(); @@ -83,7 +78,6 @@ export default function loadFork(file, options, execArgv = process.execArgv) { options = { baseDir: process.cwd(), file, - forkId, ...options, }; @@ -161,7 +155,7 @@ export default function loadFork(file, options, execArgv = process.execArgv) { return { file, - forkId, + threadId: worker.threadId, promise, exit() { diff --git a/lib/plugin-support/shared-worker-loader.js b/lib/plugin-support/shared-worker-loader.js index c889d04e6..ce9540f25 100644 --- a/lib/plugin-support/shared-worker-loader.js +++ b/lib/plugin-support/shared-worker-loader.js @@ -1,6 +1,6 @@ import {EventEmitter, on} from 'node:events'; import process from 'node:process'; -import {workerData, parentPort} from 'node:worker_threads'; +import {workerData, parentPort, threadId} from 'node:worker_threads'; import pkg from '../pkg.cjs'; @@ -117,7 +117,7 @@ async function * receiveMessages(fromTestWorker, replyTo) { } let messageCounter = 0; -const messageIdPrefix = `${workerData.id}/message`; +const messageIdPrefix = `${threadId}/message`; const nextMessageId = () => `${messageIdPrefix}/${++messageCounter}`; function publishMessage(testWorker, data, replyTo) { diff --git a/lib/plugin-support/shared-workers.js b/lib/plugin-support/shared-workers.js index 4441e086c..94030ced5 100644 --- a/lib/plugin-support/shared-workers.js +++ b/lib/plugin-support/shared-workers.js @@ -6,7 +6,6 @@ import serializeError from '../serialize-error.js'; const LOADER = new URL('shared-worker-loader.js', import.meta.url); -let sharedWorkerCounter = 0; const launchedWorkers = new Map(); const waitForAvailable = async worker => { @@ -22,14 +21,11 @@ function launchWorker(filename, initialData) { return launchedWorkers.get(filename); } - // TODO: remove the custom id and use the built-in thread id. - const id = `shared-worker/${++sharedWorkerCounter}`; const worker = new Worker(LOADER, { // Ensure the worker crashes for unhandled rejections, rather than allowing undefined behavior. execArgv: ['--unhandled-rejections=strict'], workerData: { filename, - id, initialData, }, }); @@ -69,7 +65,7 @@ export async function observeWorkerProcess(fork, runStatus) { const launched = launchWorker(filename, initialData); const handleWorkerMessage = async message => { - if (message.type === 'deregistered-test-worker' && message.id === fork.forkId) { + if (message.type === 'deregistered-test-worker' && message.id === fork.threadId) { launched.worker.off('message', handleWorkerMessage); registrationCount--; @@ -95,7 +91,7 @@ export async function observeWorkerProcess(fork, runStatus) { launched.worker.postMessage({ type: 'register-test-worker', - id: fork.forkId, + id: fork.threadId, file: pathToFileURL(fork.file).toString(), port, }, [port]); @@ -103,7 +99,7 @@ export async function observeWorkerProcess(fork, runStatus) { fork.promise.finally(() => { launched.worker.postMessage({ type: 'deregister-test-worker', - id: fork.forkId, + id: fork.threadId, }); }); diff --git a/lib/worker/channel.cjs b/lib/worker/channel.cjs index bb34765e1..41a55e37d 100644 --- a/lib/worker/channel.cjs +++ b/lib/worker/channel.cjs @@ -1,13 +1,12 @@ 'use strict'; const events = require('events'); const process = require('process'); -const {MessageChannel} = require('worker_threads'); +const {MessageChannel, threadId} = require('worker_threads'); const pEvent = require('p-event'); const timers = require('../now-and-timers.cjs'); -const {get: getOptions} = require('./options.cjs'); const {isRunningInChildProcess, isRunningInThread} = require('./utils.cjs'); const selectAvaMessage = type => message => message.ava && message.ava.type === type; @@ -150,8 +149,7 @@ function createChannelEmitter(channelId) { } function registerSharedWorker(filename, initialData) { - const {forkId} = getOptions(); - const channelId = `${forkId}/channel/${++channelCounter}`; + const channelId = `${threadId}/channel/${++channelCounter}`; const {port1: ourPort, port2: theirPort} = new MessageChannel(); const sharedWorkerHandle = new MessagePortHandle(ourPort);