Skip to content

Commit

Permalink
server/sdk: wip support for alternative fork main, fork names. add in…
Browse files Browse the repository at this point in the history
…itial workerData message channel.
  • Loading branch information
koush committed Jul 31, 2024
1 parent 2013830 commit d0b46c3
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 53 deletions.
6 changes: 4 additions & 2 deletions common/src/zygote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ import os from 'os';

export type Zygote<T> = () => PluginFork<T>;

export function createZygote<T>(): Zygote<T> {
export function createZygote<T>(name: string): Zygote<T> {
let zygote = sdk.fork<T>();
function* next() {
while (true) {
const cur = zygote;
zygote = sdk.fork<T>();
zygote = sdk.fork<T>({
name,
});
yield cur;
}
}
Expand Down
4 changes: 2 additions & 2 deletions sdk/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/sdk",
"version": "0.3.45",
"version": "0.3.46",
"description": "",
"main": "dist/src/index.js",
"exports": {
Expand Down
4 changes: 2 additions & 2 deletions sdk/types/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/types/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/types",
"version": "0.3.43",
"version": "0.3.44",
"description": "",
"main": "dist/index.js",
"author": "",
Expand Down
7 changes: 6 additions & 1 deletion sdk/types/src/types.input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2416,6 +2416,11 @@ export interface ConnectOptions extends APIOptions {
pluginId: string;
}

export interface ForkOptions {
name?: string;
filename?: string;
}

export interface ScryptedStatic {
/**
* @deprecated
Expand All @@ -2436,7 +2441,7 @@ export interface ScryptedStatic {
* Start a new instance of the plugin, returning an instance of the new process
* and the result of the fork method.
*/
fork<T>(): PluginFork<T>;
fork<T>(options?: ForkOptions): PluginFork<T>;
/**
* Initiate the Scrypted RPC wire protocol on a socket.
* @param socket
Expand Down
16 changes: 8 additions & 8 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"@mapbox/node-pre-gyp": "^1.0.11",
"@scrypted/ffmpeg-static": "^6.1.0-build1",
"@scrypted/node-pty": "^1.0.18",
"@scrypted/types": "^0.3.43",
"@scrypted/types": "^0.3.44",
"adm-zip": "^0.5.14",
"body-parser": "^1.20.2",
"cookie-parser": "^1.4.6",
Expand Down Expand Up @@ -46,7 +46,7 @@
"@types/semver": "^7.5.8",
"@types/source-map-support": "^0.5.10",
"@types/whatwg-mimetype": "^3.0.2",
"@types/ws": "^8.5.11"
"@types/ws": "^8.5.12"
},
"bin": {
"scrypted-serve": "bin/scrypted-serve"
Expand Down
1 change: 1 addition & 0 deletions server/src/plugin/plugin-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ export interface PluginRemoteLoadZipOptions {
debug?: boolean;
zipHash: string;
fork?: boolean;
main?: string;

clusterId: string;
clusterSecret: string;
Expand Down
19 changes: 13 additions & 6 deletions server/src/plugin/plugin-remote-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
throw new Error(`unknown service ${name}`);
},
async onLoadZip(scrypted: ScryptedStatic, params: any, packageJson: any, getZip: () => Promise<Buffer>, zipOptions: PluginRemoteLoadZipOptions) {
const mainFile = zipOptions?.main || 'main';
const mainNodejs = `${mainFile}.nodejs.js`;
const pluginMainNodeJs = `/plugin/${mainNodejs}`;
const pluginIdMainNodeJs = `/${pluginId}/${mainNodejs}`;

const { clusterId, clusterSecret, zipHash } = zipOptions;
const { zipFile, unzippedPath } = await prepareZip(getPluginVolume(pluginId), zipHash, getZip);

Expand Down Expand Up @@ -266,7 +271,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
// params.window = window;
params.exports = exports;

const entry = pluginReader('main.nodejs.js.map')
const entry = pluginReader(`${mainNodejs}.map`)
const map = entry?.toString();

// plugins may install their own sourcemap support during startup, so
Expand All @@ -287,11 +292,11 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
installSourceMapSupport({
environment: 'node',
retrieveSourceMap(source) {
if (source === '/plugin/main.nodejs.js' || source === `/${pluginId}/main.nodejs.js`) {
if (source === pluginMainNodeJs || source === pluginIdMainNodeJs) {
if (!map)
return null;
return {
url: '/plugin/main.nodejs.js',
url: pluginMainNodeJs,
map,
}
}
Expand All @@ -314,7 +319,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
await pong(time);
};

const main = pluginReader('main.nodejs.js');
const main = pluginReader(mainNodejs);
const script = main.toString();

scrypted.connect = (socket, options) => {
Expand All @@ -323,14 +328,16 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe

const pluginRemoteAPI: PluginRemote = scrypted.pluginRemoteAPI;

scrypted.fork = () => {
scrypted.fork = (options) => {
const ntw = new NodeThreadWorker(mainFilename, pluginId, {
packageJson,
env: process.env,
pluginDebug: undefined,
zipFile,
unzippedPath,
zipHash,
}, {
name: options?.name,
});

const result = (async () => {
Expand Down Expand Up @@ -389,7 +396,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
}

try {
const filename = zipOptions?.debug ? '/plugin/main.nodejs.js' : `/${pluginId}/main.nodejs.js`;
const filename = zipOptions?.debug ? pluginMainNodeJs : pluginIdMainNodeJs;
evalLocal(peer, script, filename, params);

if (zipOptions?.fork) {
Expand Down
15 changes: 8 additions & 7 deletions server/src/plugin/runtime/node-thread-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
worker: worker_threads.Worker;
port: worker_threads.MessagePort;

constructor(mainFilename: string, public pluginId: string, options: RuntimeWorkerOptions) {
constructor(mainFilename: string, public pluginId: string, options: RuntimeWorkerOptions, workerOptions?: worker_threads.WorkerOptions) {
super();
const { env } = options;

const message = new worker_threads.MessageChannel();
const { port1, port2 } = message;
this.worker = new worker_threads.Worker(mainFilename, {
argv: ['child-thread', this.pluginId],
env: Object.assign({}, process.env, env),
workerData: {
port: port1,
},
transferList: [port1],
...workerOptions,
});

this.worker.on('exit', () => {
Expand All @@ -27,19 +34,13 @@ export class NodeThreadWorker extends EventEmitter implements RuntimeWorker {
this.emit('error', e);
});

const message = new worker_threads.MessageChannel();
const { port1, port2 } = message;
this.port = port2;
this.port.on('messageerror', e => {
this.emit('error', e);
});
this.port.on('close', () => {
this.emit('error', new Error('port closed'));
});

this.worker.postMessage({
port: port1,
}, [port1]);
}

get pid() {
Expand Down
40 changes: 19 additions & 21 deletions server/src/scrypted-plugin-main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,25 @@ function start(mainFilename: string) {
module.paths.push(getPluginNodePath(pluginId));

if (process.argv[2] === 'child-thread') {
worker_threads.parentPort.once('message', message => {
const { port } = message as { port: worker_threads.MessagePort };
const peer = startPluginRemote(mainFilename, pluginId, (message, reject) => {
try {
port.postMessage(v8.serialize(message));
}
catch (e) {
reject?.(e);
}
});
peer.transportSafeArgumentTypes.add(Buffer.name);
peer.transportSafeArgumentTypes.add(Uint8Array.name);
port.on('message', message => peer.handleMessage(v8.deserialize(message)));
port.on('messageerror', e => {
console.error('message error', e);
process.exit(1);
});
port.on('close', () => {
console.error('port closed');
process.exit(1);
});
const { port } = worker_threads.workerData as { port: worker_threads.MessagePort };
const peer = startPluginRemote(mainFilename, pluginId, (message, reject) => {
try {
port.postMessage(v8.serialize(message));
}
catch (e) {
reject?.(e);
}
});
peer.transportSafeArgumentTypes.add(Buffer.name);
peer.transportSafeArgumentTypes.add(Uint8Array.name);
port.on('message', message => peer.handleMessage(v8.deserialize(message)));
port.on('messageerror', e => {
console.error('message error', e);
process.exit(1);
});
port.on('close', () => {
console.error('port closed');
process.exit(1);
});
}
else {
Expand Down

0 comments on commit d0b46c3

Please sign in to comment.