From d0b46c35a993a9c73d5eca19b3ab256da51809dd Mon Sep 17 00:00:00 2001 From: Koushik Dutta Date: Tue, 30 Jul 2024 22:48:52 -0700 Subject: [PATCH] server/sdk: wip support for alternative fork main, fork names. add initial workerData message channel. --- common/src/zygote.ts | 6 ++- sdk/package-lock.json | 4 +- sdk/package.json | 2 +- sdk/types/package-lock.json | 4 +- sdk/types/package.json | 2 +- sdk/types/src/types.input.ts | 7 +++- server/package-lock.json | 16 ++++---- server/package.json | 4 +- server/src/plugin/plugin-api.ts | 1 + server/src/plugin/plugin-remote-worker.ts | 19 ++++++--- .../src/plugin/runtime/node-thread-worker.ts | 15 +++---- server/src/scrypted-plugin-main.ts | 40 +++++++++---------- 12 files changed, 67 insertions(+), 53 deletions(-) diff --git a/common/src/zygote.ts b/common/src/zygote.ts index 611eb36a05..a43a96c103 100644 --- a/common/src/zygote.ts +++ b/common/src/zygote.ts @@ -5,12 +5,14 @@ import os from 'os'; export type Zygote = () => PluginFork; -export function createZygote(): Zygote { +export function createZygote(name: string): Zygote { let zygote = sdk.fork(); function* next() { while (true) { const cur = zygote; - zygote = sdk.fork(); + zygote = sdk.fork({ + name, + }); yield cur; } } diff --git a/sdk/package-lock.json b/sdk/package-lock.json index 1d3b374b96..17cab378d6 100644 --- a/sdk/package-lock.json +++ b/sdk/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/sdk", - "version": "0.3.45", + "version": "0.3.46", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/sdk", - "version": "0.3.45", + "version": "0.3.46", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.18.6", diff --git a/sdk/package.json b/sdk/package.json index a1cf1370c2..19e087fbb0 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/sdk", - "version": "0.3.45", + "version": "0.3.46", "description": "", "main": "dist/src/index.js", "exports": { diff --git a/sdk/types/package-lock.json b/sdk/types/package-lock.json index ac78652caf..76655a405e 100644 --- a/sdk/types/package-lock.json +++ b/sdk/types/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/types", - "version": "0.3.43", + "version": "0.3.44", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/types", - "version": "0.3.43", + "version": "0.3.44", "license": "ISC", "devDependencies": { "@types/node": "^18.19.15", diff --git a/sdk/types/package.json b/sdk/types/package.json index da3cb4f829..8fde6ad585 100644 --- a/sdk/types/package.json +++ b/sdk/types/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/types", - "version": "0.3.43", + "version": "0.3.44", "description": "", "main": "dist/index.js", "author": "", diff --git a/sdk/types/src/types.input.ts b/sdk/types/src/types.input.ts index d82c6e364e..b7ebc81a54 100644 --- a/sdk/types/src/types.input.ts +++ b/sdk/types/src/types.input.ts @@ -2416,6 +2416,11 @@ export interface ConnectOptions extends APIOptions { pluginId: string; } +export interface ForkOptions { + name?: string; + filename?: string; +} + export interface ScryptedStatic { /** * @deprecated @@ -2436,7 +2441,7 @@ export interface ScryptedStatic { * Start a new instance of the plugin, returning an instance of the new process * and the result of the fork method. */ - fork(): PluginFork; + fork(options?: ForkOptions): PluginFork; /** * Initiate the Scrypted RPC wire protocol on a socket. * @param socket diff --git a/server/package-lock.json b/server/package-lock.json index a0433eb614..849f18e84a 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -13,7 +13,7 @@ "@mapbox/node-pre-gyp": "^1.0.11", "@scrypted/ffmpeg-static": "^6.1.0-build1", "@scrypted/node-pty": "^1.0.18", - "@scrypted/types": "^0.3.43", + "@scrypted/types": "^0.3.44", "adm-zip": "^0.5.14", "body-parser": "^1.20.2", "cookie-parser": "^1.4.6", @@ -56,7 +56,7 @@ "@types/semver": "^7.5.8", "@types/source-map-support": "^0.5.10", "@types/whatwg-mimetype": "^3.0.2", - "@types/ws": "^8.5.11" + "@types/ws": "^8.5.12" } }, "node_modules/@emnapi/runtime": { @@ -711,9 +711,9 @@ } }, "node_modules/@scrypted/types": { - "version": "0.3.43", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.43.tgz", - "integrity": "sha512-7HcjxTxuFXcYgdf10y1UWhA8SYsS9V96YaTd/CToIE4fJtGS5pwwbrE24Wa3r6B582CqtTBUHA4Zt8OFiSCQAw==" + "version": "0.3.44", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.44.tgz", + "integrity": "sha512-sHqk2gLeXxKCbX3Bg7XQOjTGVDQPhokg9ASIgzGaaswI4gYJRXLv4XMCOeRZQJY/he9FxJOgVZxagDIQbkF9lg==" }, "node_modules/@types/adm-zip": { "version": "0.5.5", @@ -908,9 +908,9 @@ "dev": true }, "node_modules/@types/ws": { - "version": "8.5.11", - "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.11.tgz", - "integrity": "sha512-4+q7P5h3SpJxaBft0Dzpbr6lmMaqh0Jr2tbhJZ/luAwvD7ohSCniYkwz/pLxuT2h0EOa6QADgJj1Ko+TzRfZ+w==", + "version": "8.5.12", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.12.tgz", + "integrity": "sha512-3tPRkv1EtkDpzlgyKyI8pGsGZAGPEaXeu0DOj5DI25Ja91bdAYddYHbADRYVrZMRbfW+1l5YwXVDKohDJNQxkQ==", "dev": true, "dependencies": { "@types/node": "*" diff --git a/server/package.json b/server/package.json index ecc5dfd366..3690136c14 100644 --- a/server/package.json +++ b/server/package.json @@ -6,7 +6,7 @@ "@mapbox/node-pre-gyp": "^1.0.11", "@scrypted/ffmpeg-static": "^6.1.0-build1", "@scrypted/node-pty": "^1.0.18", - "@scrypted/types": "^0.3.43", + "@scrypted/types": "^0.3.44", "adm-zip": "^0.5.14", "body-parser": "^1.20.2", "cookie-parser": "^1.4.6", @@ -46,7 +46,7 @@ "@types/semver": "^7.5.8", "@types/source-map-support": "^0.5.10", "@types/whatwg-mimetype": "^3.0.2", - "@types/ws": "^8.5.11" + "@types/ws": "^8.5.12" }, "bin": { "scrypted-serve": "bin/scrypted-serve" diff --git a/server/src/plugin/plugin-api.ts b/server/src/plugin/plugin-api.ts index 2c9525183f..47e1d30aef 100644 --- a/server/src/plugin/plugin-api.ts +++ b/server/src/plugin/plugin-api.ts @@ -161,6 +161,7 @@ export interface PluginRemoteLoadZipOptions { debug?: boolean; zipHash: string; fork?: boolean; + main?: string; clusterId: string; clusterSecret: string; diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index a8d5604fa2..fb65d658ae 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -93,6 +93,11 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe throw new Error(`unknown service ${name}`); }, async onLoadZip(scrypted: ScryptedStatic, params: any, packageJson: any, getZip: () => Promise, zipOptions: PluginRemoteLoadZipOptions) { + const mainFile = zipOptions?.main || 'main'; + const mainNodejs = `${mainFile}.nodejs.js`; + const pluginMainNodeJs = `/plugin/${mainNodejs}`; + const pluginIdMainNodeJs = `/${pluginId}/${mainNodejs}`; + const { clusterId, clusterSecret, zipHash } = zipOptions; const { zipFile, unzippedPath } = await prepareZip(getPluginVolume(pluginId), zipHash, getZip); @@ -266,7 +271,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe // params.window = window; params.exports = exports; - const entry = pluginReader('main.nodejs.js.map') + const entry = pluginReader(`${mainNodejs}.map`) const map = entry?.toString(); // plugins may install their own sourcemap support during startup, so @@ -287,11 +292,11 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe installSourceMapSupport({ environment: 'node', retrieveSourceMap(source) { - if (source === '/plugin/main.nodejs.js' || source === `/${pluginId}/main.nodejs.js`) { + if (source === pluginMainNodeJs || source === pluginIdMainNodeJs) { if (!map) return null; return { - url: '/plugin/main.nodejs.js', + url: pluginMainNodeJs, map, } } @@ -314,7 +319,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe await pong(time); }; - const main = pluginReader('main.nodejs.js'); + const main = pluginReader(mainNodejs); const script = main.toString(); scrypted.connect = (socket, options) => { @@ -323,7 +328,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe const pluginRemoteAPI: PluginRemote = scrypted.pluginRemoteAPI; - scrypted.fork = () => { + scrypted.fork = (options) => { const ntw = new NodeThreadWorker(mainFilename, pluginId, { packageJson, env: process.env, @@ -331,6 +336,8 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe zipFile, unzippedPath, zipHash, + }, { + name: options?.name, }); const result = (async () => { @@ -389,7 +396,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe } try { - const filename = zipOptions?.debug ? '/plugin/main.nodejs.js' : `/${pluginId}/main.nodejs.js`; + const filename = zipOptions?.debug ? pluginMainNodeJs : pluginIdMainNodeJs; evalLocal(peer, script, filename, params); if (zipOptions?.fork) { diff --git a/server/src/plugin/runtime/node-thread-worker.ts b/server/src/plugin/runtime/node-thread-worker.ts index 5952061d15..20785b1376 100644 --- a/server/src/plugin/runtime/node-thread-worker.ts +++ b/server/src/plugin/runtime/node-thread-worker.ts @@ -8,13 +8,20 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { worker: worker_threads.Worker; port: worker_threads.MessagePort; - constructor(mainFilename: string, public pluginId: string, options: RuntimeWorkerOptions) { + constructor(mainFilename: string, public pluginId: string, options: RuntimeWorkerOptions, workerOptions?: worker_threads.WorkerOptions) { super(); const { env } = options; + const message = new worker_threads.MessageChannel(); + const { port1, port2 } = message; this.worker = new worker_threads.Worker(mainFilename, { argv: ['child-thread', this.pluginId], env: Object.assign({}, process.env, env), + workerData: { + port: port1, + }, + transferList: [port1], + ...workerOptions, }); this.worker.on('exit', () => { @@ -27,8 +34,6 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { this.emit('error', e); }); - const message = new worker_threads.MessageChannel(); - const { port1, port2 } = message; this.port = port2; this.port.on('messageerror', e => { this.emit('error', e); @@ -36,10 +41,6 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker { this.port.on('close', () => { this.emit('error', new Error('port closed')); }); - - this.worker.postMessage({ - port: port1, - }, [port1]); } get pid() { diff --git a/server/src/scrypted-plugin-main.ts b/server/src/scrypted-plugin-main.ts index 77c4bac06b..ea1f2fe35f 100644 --- a/server/src/scrypted-plugin-main.ts +++ b/server/src/scrypted-plugin-main.ts @@ -12,27 +12,25 @@ function start(mainFilename: string) { module.paths.push(getPluginNodePath(pluginId)); if (process.argv[2] === 'child-thread') { - worker_threads.parentPort.once('message', message => { - const { port } = message as { port: worker_threads.MessagePort }; - const peer = startPluginRemote(mainFilename, pluginId, (message, reject) => { - try { - port.postMessage(v8.serialize(message)); - } - catch (e) { - reject?.(e); - } - }); - peer.transportSafeArgumentTypes.add(Buffer.name); - peer.transportSafeArgumentTypes.add(Uint8Array.name); - port.on('message', message => peer.handleMessage(v8.deserialize(message))); - port.on('messageerror', e => { - console.error('message error', e); - process.exit(1); - }); - port.on('close', () => { - console.error('port closed'); - process.exit(1); - }); + const { port } = worker_threads.workerData as { port: worker_threads.MessagePort }; + const peer = startPluginRemote(mainFilename, pluginId, (message, reject) => { + try { + port.postMessage(v8.serialize(message)); + } + catch (e) { + reject?.(e); + } + }); + peer.transportSafeArgumentTypes.add(Buffer.name); + peer.transportSafeArgumentTypes.add(Uint8Array.name); + port.on('message', message => peer.handleMessage(v8.deserialize(message))); + port.on('messageerror', e => { + console.error('message error', e); + process.exit(1); + }); + port.on('close', () => { + console.error('port closed'); + process.exit(1); }); } else {