Skip to content

Commit

Permalink
fix(core): move plugin worker to socket + server
Browse files Browse the repository at this point in the history
  • Loading branch information
AgentEnder committed Jun 20, 2024
1 parent 11f4045 commit c20872a
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 163 deletions.
3 changes: 2 additions & 1 deletion packages/nx/src/command-line/run/command-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const yargsNxInfixCommand: CommandModule = {
command: '$0 <target> [project] [_..]',
describe: 'Run a target for a project',
handler: async (args) => {
await handleErrors(
const exitCode = await handleErrors(
(args.verbose as boolean) ?? process.env.NX_VERBOSE_LOGGING === 'true',
async () => {
return (await import('./run-one')).runOne(
Expand All @@ -46,5 +46,6 @@ export const yargsNxInfixCommand: CommandModule = {
);
}
);
process.exit(exitCode);
},
};
5 changes: 5 additions & 0 deletions packages/nx/src/daemon/socket-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ export const getForkedProcessOsSocketPath = (id: string) => {
return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path);
};

export const getPluginOsSocketPath = (id: string) => {
let path = resolve(join(getSocketDir(), 'plugin' + id + '.sock'));
return isWindows ? '\\\\.\\pipe\\nx\\' + resolve(path) : resolve(path);
};

export function killSocketOrPath(): void {
try {
unlinkSync(getFullOsSocketPath());
Expand Down
14 changes: 12 additions & 2 deletions packages/nx/src/project-graph/plugins/isolation/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import {
CreateDependenciesContext,
CreateMetadataContext,
CreateNodesContext,
CreateNodesContextV2,
} from '../public-api';
import { LoadedNxPlugin } from '../internal-api';
import { Serializable } from 'child_process';
import { Socket } from 'net';

export interface PluginWorkerLoadMessage {
type: 'load';
Expand Down Expand Up @@ -42,7 +44,7 @@ export interface PluginWorkerCreateNodesMessage {
type: 'createNodes';
payload: {
configFiles: string[];
context: CreateNodesContext;
context: CreateNodesContextV2;
tx: string;
};
}
Expand Down Expand Up @@ -192,6 +194,7 @@ type MessageHandlerReturn<T extends PluginWorkerMessage | PluginWorkerResult> =
export async function consumeMessage<
T extends PluginWorkerMessage | PluginWorkerResult
>(
socket: Socket,
raw: T,
handlers: {
[K in T['type']]: (
Expand All @@ -205,7 +208,14 @@ export async function consumeMessage<
if (handler) {
const response = await handler(message.payload);
if (response) {
process.send!(response);
sendMessageOverSocket(socket, response);
}
}
}

export function sendMessageOverSocket(
socket: Socket,
message: PluginWorkerMessage | PluginWorkerResult
) {
socket.write(JSON.stringify(message) + String.fromCodePoint(4));
}
184 changes: 129 additions & 55 deletions packages/nx/src/project-graph/plugins/isolation/plugin-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@ import { PluginConfiguration } from '../../../config/nx-json';
// import { logger } from '../../utils/logger';

import { LoadedNxPlugin, nxPluginCache } from '../internal-api';
import { consumeMessage, isPluginWorkerResult } from './messaging';
import { getPluginOsSocketPath } from '../../../daemon/socket-utils';
import { consumeMessagesFromSocket } from '../../../utils/consume-messages-from-socket';

import {
consumeMessage,
isPluginWorkerResult,
sendMessageOverSocket,
} from './messaging';
import { Socket, connect } from 'net';
import { unlinkSync } from 'fs';

const cleanupFunctions = new Set<() => void>();

Expand All @@ -19,59 +28,39 @@ interface PendingPromise {
rejector: (err: any) => void;
}

export function loadRemoteNxPlugin(
export async function loadRemoteNxPlugin(
plugin: PluginConfiguration,
root: string
): Promise<LoadedNxPlugin> {
// this should only really be true when running unit tests within
// the Nx repo. We still need to start the worker in this case,
// but its typescript.
const isWorkerTypescript = path.extname(__filename) === '.ts';
const workerPath = path.join(__dirname, 'plugin-worker');

const env: Record<string, string> = {
...process.env,
...(isWorkerTypescript
? {
// Ensures that the worker uses the same tsconfig as the main process
TS_NODE_PROJECT: path.join(
__dirname,
'../../../../tsconfig.lib.json'
),
}
: {}),
};

const worker = fork(workerPath, [], {
stdio: ['ignore', 'inherit', 'inherit', 'ipc'],
env,
execArgv: [
...process.execArgv,
// If the worker is typescript, we need to register ts-node
...(isWorkerTypescript ? ['-r', 'ts-node/register'] : []),
],
});
worker.send({ type: 'load', payload: { plugin, root } });
const { ipcPath, worker } = await startPluginWorker(plugin);

// logger.verbose(`[plugin-worker] started worker: ${worker.pid}`);

const pendingPromises = new Map<string, PendingPromise>();
return new Promise<LoadedNxPlugin>((res, rej) => {
const socket = connect(ipcPath, () => {
sendMessageOverSocket(socket, {
type: 'load',
payload: { plugin, root },
});
// logger.verbose(`[plugin-worker] started worker: ${worker.pid}`);

const exitHandler = createWorkerExitHandler(worker, pendingPromises);
const pendingPromises = new Map<string, PendingPromise>();

const cleanupFunction = () => {
worker.off('exit', exitHandler);
shutdownPluginWorker(worker);
};
const exitHandler = createWorkerExitHandler(worker, pendingPromises);

cleanupFunctions.add(cleanupFunction);
const cleanupFunction = () => {
worker.off('exit', exitHandler);
socket.destroy();
shutdownPluginWorker(worker);
};

return new Promise<LoadedNxPlugin>((res, rej) => {
worker.on(
'message',
createWorkerHandler(worker, pendingPromises, res, rej)
);
worker.on('exit', exitHandler);
cleanupFunctions.add(cleanupFunction);
socket.on(
'data',
consumeMessagesFromSocket(
createWorkerHandler(worker, pendingPromises, res, rej, socket)
)
);
worker.on('exit', exitHandler);
});
});
}

Expand All @@ -96,15 +85,18 @@ function createWorkerHandler(
worker: ChildProcess,
pending: Map<string, PendingPromise>,
onload: (plugin: LoadedNxPlugin) => void,
onloadError: (err?: unknown) => void
onloadError: (err?: unknown) => void,
socket: Socket
) {
let pluginName: string;

return function (message: Serializable) {
return function (raw: string) {
const message = JSON.parse(raw);

if (!isPluginWorkerResult(message)) {
return;
}
return consumeMessage(message, {
return consumeMessage(socket, message, {
'load-result': (result) => {
if (result.success) {
const { name, createNodesPattern, include, exclude } = result;
Expand All @@ -120,7 +112,7 @@ function createWorkerHandler(
(configFiles, ctx) => {
const tx = pluginName + ':createNodes:' + performance.now();
return registerPendingPromise(tx, pending, () => {
worker.send({
sendMessageOverSocket(socket, {
type: 'createNodes',
payload: { configFiles, context: ctx, tx },
});
Expand All @@ -133,7 +125,7 @@ function createWorkerHandler(
const tx =
pluginName + ':createDependencies:' + performance.now();
return registerPendingPromise(tx, pending, () => {
worker.send({
sendMessageOverSocket(socket, {
type: 'createDependencies',
payload: { context: ctx, tx },
});
Expand All @@ -145,7 +137,7 @@ function createWorkerHandler(
const tx =
pluginName + ':processProjectGraph:' + performance.now();
return registerPendingPromise(tx, pending, () => {
worker.send({
sendMessageOverSocket(socket, {
type: 'processProjectGraph',
payload: { graph, ctx, tx },
});
Expand All @@ -157,7 +149,7 @@ function createWorkerHandler(
const tx =
pluginName + ':createMetadata:' + performance.now();
return registerPendingPromise(tx, pending, () => {
worker.send({
sendMessageOverSocket(socket, {
type: 'createMetadata',
payload: { graph, context: ctx, tx },
});
Expand Down Expand Up @@ -222,11 +214,18 @@ function createWorkerExitHandler(
};
}

process.on('exit', () => {
let cleanedUp = false;
const exitHandler = () => {
if (cleanedUp) return;
for (const fn of cleanupFunctions) {
fn();
}
});
cleanedUp = true;
};

process.on('exit', exitHandler);
process.on('SIGINT', exitHandler);
process.on('SIGTERM', exitHandler);

function registerPendingPromise(
tx: string,
Expand All @@ -252,3 +251,78 @@ function registerPendingPromise(

return promise;
}

let workerCount = 0;
async function startPluginWorker(plugin: PluginConfiguration) {
// this should only really be true when running unit tests within
// the Nx repo. We still need to start the worker in this case,
// but its typescript.
const isWorkerTypescript = path.extname(__filename) === '.ts';
const workerPath = path.join(__dirname, 'plugin-worker');

const env: Record<string, string> = {
...process.env,
...(isWorkerTypescript
? {
// Ensures that the worker uses the same tsconfig as the main process
TS_NODE_PROJECT: path.join(
__dirname,
'../../../../tsconfig.lib.json'
),
}
: {}),
};

const ipcPath = getPluginOsSocketPath([process.pid, workerCount++].join('-'));

const worker = fork(workerPath, [ipcPath], {
stdio: process.stdout.isTTY ? 'inherit' : 'ignore',
env,
execArgv: [
...process.execArgv,
// If the worker is typescript, we need to register ts-node
...(isWorkerTypescript ? ['-r', 'ts-node/register'] : []),
],
detached: true,
});
worker.disconnect();
worker.unref();

let attempts = 0;
return new Promise<{
worker: ChildProcess;
ipcPath: string;
}>((resolve, reject) => {
const id = setInterval(async () => {
if (await isServerAvailable(ipcPath)) {
clearInterval(id);
resolve({
worker,
ipcPath,
});
} else if (attempts > 1000) {
// daemon fails to start, the process probably exited
// we print the logs and exit the client
reject('Failed to start plugin worker.');
} else {
attempts++;
}
}, 10);
});
}

function isServerAvailable(ipcPath: string): Promise<boolean> {
return new Promise((resolve) => {
try {
const socket = connect(ipcPath, () => {
socket.destroy();
resolve(true);
});
socket.once('error', () => {
resolve(false);
});
} catch (err) {
resolve(false);
}
});
}
Loading

0 comments on commit c20872a

Please sign in to comment.