Skip to content

Commit

Permalink
fix(core): ensure workers used if multiple instances of plugin pool a…
Browse files Browse the repository at this point in the history
…re created
  • Loading branch information
AgentEnder committed Jun 24, 2024
1 parent 3919a13 commit 6d905f9
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 24 deletions.
2 changes: 1 addition & 1 deletion packages/nx/src/project-graph/plugins/internal-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ export async function loadNxPlugins(

const cleanupFunctions: Array<() => void> = [];
for (const plugin of plugins) {
const [loadedPluginPromise, cleanup] = loadingMethod(plugin, root);
const [loadedPluginPromise, cleanup] = await loadingMethod(plugin, root);
result.push(loadedPluginPromise);
cleanupFunctions.push(cleanup);
}
Expand Down
6 changes: 3 additions & 3 deletions packages/nx/src/project-graph/plugins/isolation/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,17 @@ const remotePluginCache = new Map<
readonly [Promise<LoadedNxPlugin>, () => void]
>();

export function loadNxPluginInIsolation(
export async function loadNxPluginInIsolation(
plugin: PluginConfiguration,
root = workspaceRoot
): readonly [Promise<LoadedNxPlugin>, () => void] {
): Promise<readonly [Promise<LoadedNxPlugin>, () => void]> {
const cacheKey = JSON.stringify(plugin);

if (remotePluginCache.has(cacheKey)) {
return remotePluginCache.get(cacheKey);
}

const [loadingPlugin, cleanup] = loadRemoteNxPlugin(plugin, root);
const [loadingPlugin, cleanup] = await loadRemoteNxPlugin(plugin, root);
// We clean up plugin workers when Nx process completes.
const val = [
loadingPlugin,
Expand Down
55 changes: 35 additions & 20 deletions packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,21 @@ interface PendingPromise {
rejector: (err: any) => void;
}

type NxPluginWorkerCache = Map<string, Promise<LoadedNxPlugin>>;

const nxPluginWorkerCache: NxPluginWorkerCache = (global[
'nxPluginWorkerCache'
] ??= new Map());

export async function loadRemoteNxPlugin(
plugin: PluginConfiguration,
root: string
): Promise<[Promise<LoadedNxPlugin>, () => void]> {
const cacheKey = JSON.stringify(plugin);
if (nxPluginWorkerCache.has(cacheKey)) {
return [nxPluginWorkerCache.get(cacheKey), () => {}];
}

const { ipcPath, worker } = await startPluginWorker(plugin);

const socket = await new Promise<Socket>((res, rej) => {
Expand All @@ -48,28 +59,30 @@ export async function loadRemoteNxPlugin(
worker.off('exit', exitHandler);
socket.destroy();
shutdownPluginWorker(worker);
nxPluginWorkerCache.delete(cacheKey);
};

cleanupFunctions.add(cleanupFunction);

return [
new Promise<LoadedNxPlugin>((res, rej) => {
sendMessageOverSocket(socket, {
type: 'load',
payload: { plugin, root },
});
// logger.verbose(`[plugin-worker] started worker: ${worker.pid}`);
const pluginPromise = new Promise<LoadedNxPlugin>((res, rej) => {
sendMessageOverSocket(socket, {
type: 'load',
payload: { plugin, root },
});
// logger.verbose(`[plugin-worker] started worker: ${worker.pid}`);

socket.on(
'data',
consumeMessagesFromSocket(
createWorkerHandler(worker, pendingPromises, res, rej, socket)
)
);
worker.on('exit', exitHandler);
});

socket.on(
'data',
consumeMessagesFromSocket(
createWorkerHandler(worker, pendingPromises, res, rej, socket)
)
);
worker.on('exit', exitHandler);
}),
cleanupFunction,
];
nxPluginWorkerCache.set(cacheKey, pluginPromise);

return [pluginPromise, cleanupFunction];
}

function shutdownPluginWorker(worker: ChildProcess) {
Expand Down Expand Up @@ -251,7 +264,7 @@ function registerPendingPromise(

callback();
}).finally(() => {
// pending.delete(tx);
pending.delete(tx);
});

pending.set(tx, {
Expand All @@ -263,7 +276,7 @@ function registerPendingPromise(
return promise;
}

let workerCount = 0;
global.nxPluginWorkerCount ??= 0;
async function startPluginWorker(plugin: PluginConfiguration) {
// this should only really be true when running unit tests within
// the Nx repo. We still need to start the worker in this case,
Expand All @@ -284,7 +297,9 @@ async function startPluginWorker(plugin: PluginConfiguration) {
: {}),
};

const ipcPath = getPluginOsSocketPath([process.pid, workerCount++].join('-'));
const ipcPath = getPluginOsSocketPath(
[process.pid, global.nxPluginWorkerCount++].join('-')
);

const worker = fork(workerPath, [ipcPath], {
stdio: process.stdout.isTTY ? 'inherit' : 'ignore',
Expand Down

0 comments on commit 6d905f9

Please sign in to comment.