diff --git a/sdk/package-lock.json b/sdk/package-lock.json index b53ebcdad3..f59e94bdef 100644 --- a/sdk/package-lock.json +++ b/sdk/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/sdk", - "version": "0.3.73", + "version": "0.3.74", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/sdk", - "version": "0.3.73", + "version": "0.3.74", "license": "ISC", "dependencies": { "@babel/preset-typescript": "^7.26.0", diff --git a/sdk/package.json b/sdk/package.json index f53ab6e44f..f9cc13e5d9 100644 --- a/sdk/package.json +++ b/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/sdk", - "version": "0.3.73", + "version": "0.3.74", "description": "", "main": "dist/src/index.js", "exports": { diff --git a/sdk/types/package-lock.json b/sdk/types/package-lock.json index fd8239854d..62e4b245bf 100644 --- a/sdk/types/package-lock.json +++ b/sdk/types/package-lock.json @@ -1,12 +1,12 @@ { "name": "@scrypted/types", - "version": "0.3.67", + "version": "0.3.68", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@scrypted/types", - "version": "0.3.67", + "version": "0.3.68", "license": "ISC", "devDependencies": { "@types/node": "^22.1.0", diff --git a/sdk/types/package.json b/sdk/types/package.json index 4c35c9e44a..5079bcf8e3 100644 --- a/sdk/types/package.json +++ b/sdk/types/package.json @@ -1,6 +1,6 @@ { "name": "@scrypted/types", - "version": "0.3.67", + "version": "0.3.68", "description": "", "main": "dist/index.js", "author": "", diff --git a/sdk/types/src/types.input.ts b/sdk/types/src/types.input.ts index 89f8977065..218da649e3 100644 --- a/sdk/types/src/types.input.ts +++ b/sdk/types/src/types.input.ts @@ -2544,9 +2544,9 @@ export interface ForkWorker { } export interface PluginFork { /** - * The cluster worker id that is executing this fork when in cluster mode. + * The id of the cluster worker that is executing this fork when in cluster mode. */ - workerId?: Promise; + clusterWorkerId?: Promise; result: Promise; worker: ForkWorker; } @@ -2635,9 +2635,9 @@ export interface ForkOptions { nativeId?: ScryptedNativeId; /** - * The worker id that will execute this fork. + * The id of the cluster worker id that will execute this fork. */ - workerId?: string; + clusterWorkerId?: string; /** * The labels used to select the cluster worker that will execute this fork. */ diff --git a/server/.vscode/launch.json b/server/.vscode/launch.json index 0508551754..8ff32a128d 100644 --- a/server/.vscode/launch.json +++ b/server/.vscode/launch.json @@ -94,9 +94,9 @@ "${workspaceFolder}/**/*.js" ], "env": { - // "SCRYPTED_CLUSTER_LABELS": "compute", + "SCRYPTED_CLUSTER_LABELS": "compute", "SCRYPTED_CLUSTER_MODE": "client", - "SCRYPTED_CLUSTER_SERVER": "scrypted-nvr", + "SCRYPTED_CLUSTER_SERVER": "192.168.2.124", "SCRYPTED_CLUSTER_SECRET": "swordfish", "SCRYPTED_CAN_RESTART": "true", "SCRYPTED_VOLUME": "/Users/koush/.scrypted-cluster/volume-client", diff --git a/server/package-lock.json b/server/package-lock.json index cc3aecadb2..43a09594da 100644 --- a/server/package-lock.json +++ b/server/package-lock.json @@ -11,8 +11,8 @@ "license": "ISC", "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", - "@scrypted/node-pty": "^1.0.18", - "@scrypted/types": "^0.3.66", + "@scrypted/node-pty": "^1.0.22", + "@scrypted/types": "^0.3.68", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", @@ -548,18 +548,18 @@ } }, "node_modules/@scrypted/node-pty": { - "version": "1.0.18", - "resolved": "https://registry.npmjs.org/@scrypted/node-pty/-/node-pty-1.0.18.tgz", - "integrity": "sha512-nDOlNn4nb742P60Y8Fk5BXrqyZbW3fsXBi+n0zen3noBxUSwBro0cDLIAI60aAKVD07lJ/LhDEJ+pMUR2WHUEQ==", + "version": "1.0.22", + "resolved": "https://registry.npmjs.org/@scrypted/node-pty/-/node-pty-1.0.22.tgz", + "integrity": "sha512-GXpxrtDkbEG9oFOqJ4kVNT8r0HBzSDzQyVAllAApHTec2NezgKU2wMDK668s0gPW7Q1mvF3g0EV4646cAA0hHg==", "hasInstallScript": true, "dependencies": { "prebuild-install": "npm:@scrypted/prebuild-install@^7.1.8" } }, "node_modules/@scrypted/types": { - "version": "0.3.66", - "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.66.tgz", - "integrity": "sha512-POHpVgW6Ce8mnJRaXZRm+2RtvFuPP+ZehsDrhUqkQdxmnV81m8K2+3M6Vhrt+07kNDXmrznAijoj/OzXkdZWgw==" + "version": "0.3.68", + "resolved": "https://registry.npmjs.org/@scrypted/types/-/types-0.3.68.tgz", + "integrity": "sha512-4kxJZXCLTRGgJG8l+7G8R+lENEhfad0rEouX6zcTcA6JtKRhr6OK4lmLOa7h+yY5tbivHizoAvvOSPQWOKxOww==" }, "node_modules/@types/adm-zip": { "version": "0.5.6", @@ -1294,9 +1294,9 @@ } }, "node_modules/cross-spawn": { - "version": "7.0.3", - "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.3.tgz", - "integrity": "sha512-iRDPJKUPVEND7dHPO8rkbOnPpyDygcDFtWjpeWNCgy8WP2rXcxXL8TskReQl6OrB2G7+UJrags1q15Fudc7G6w==", + "version": "7.0.6", + "resolved": "https://registry.npmjs.org/cross-spawn/-/cross-spawn-7.0.6.tgz", + "integrity": "sha512-uV2QOWP2nWzsy2aMp8aRibhi9dlzF5Hgh5SHaB9OiTGEyDTiJJyx0uy51QXdyWbtAHNua4XJzUKca3OzKUd3vA==", "dependencies": { "path-key": "^3.1.0", "shebang-command": "^2.0.0", diff --git a/server/package.json b/server/package.json index c0035b4cac..a3fc47021f 100644 --- a/server/package.json +++ b/server/package.json @@ -5,7 +5,7 @@ "dependencies": { "@scrypted/ffmpeg-static": "^6.1.0-build3", "@scrypted/node-pty": "^1.0.22", - "@scrypted/types": "^0.3.66", + "@scrypted/types": "^0.3.68", "adm-zip": "^0.5.16", "body-parser": "^1.20.3", "cookie-parser": "^1.4.7", diff --git a/server/src/cluster/cluster-labels.ts b/server/src/cluster/cluster-labels.ts index 89aa6f0808..a033863481 100644 --- a/server/src/cluster/cluster-labels.ts +++ b/server/src/cluster/cluster-labels.ts @@ -36,5 +36,6 @@ export function getClusterLabels() { } export function needsClusterForkWorker(options: ClusterForkOptions) { - return process.env.SCRYPTED_CLUSTER_ADDRESS && options?.runtime && !matchesClusterLabels(options, getClusterLabels()) + return process.env.SCRYPTED_CLUSTER_ADDRESS + && (!matchesClusterLabels(options, getClusterLabels()) || options.clusterWorkerId); } diff --git a/server/src/plugin/plugin-host.ts b/server/src/plugin/plugin-host.ts index 4fd1becb9f..ce7556c14a 100644 --- a/server/src/plugin/plugin-host.ts +++ b/server/src/plugin/plugin-host.ts @@ -379,7 +379,7 @@ export class PluginHost { }); const clusterSetup = setupCluster(this.peer); - const { runtimeWorker, forkPeer } = createClusterForkWorker((async () => { + const { runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker((async () => { await clusterSetup.initializeCluster({ clusterId: this.scrypted.clusterId, clusterSecret: this.scrypted.clusterSecret, @@ -395,6 +395,9 @@ export class PluginHost { this.peer = peer; peer.killedSafe.finally(() => originalPeer.kill()); }).catch(() => {}); + clusterWorkerId.then(clusterWorkerId => { + console.log('cluster worker id', clusterWorkerId); + }).catch(() => {}); this.worker = runtimeWorker; peer = forkPeer; diff --git a/server/src/plugin/plugin-remote-worker.ts b/server/src/plugin/plugin-remote-worker.ts index d266184c12..d021c93a2d 100644 --- a/server/src/plugin/plugin-remote-worker.ts +++ b/server/src/plugin/plugin-remote-worker.ts @@ -210,10 +210,11 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe let forkPeer: Promise; let runtimeWorker: RuntimeWorker; let nativeWorker: child_process.ChildProcess | worker_threads.Worker; + let clusterWorkerId: Promise; // if running in a cluster, fork to a matching cluster worker only if necessary. if (needsClusterForkWorker(options)) { - ({ runtimeWorker, forkPeer } = createClusterForkWorker( + ({ runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker( api.getComponent('cluster-fork'), zipHash, () => zipAPI.getZip(), options, packageJson, scrypted.connectRPCObject) ); } @@ -337,6 +338,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe nativeWorker, }; return { + clusterWorkerId, worker, result, }; diff --git a/server/src/plugin/runtime/cluster-fork-worker.ts b/server/src/plugin/runtime/cluster-fork-worker.ts index ce38f1bd52..73dd1054e9 100644 --- a/server/src/plugin/runtime/cluster-fork-worker.ts +++ b/server/src/plugin/runtime/cluster-fork-worker.ts @@ -7,7 +7,7 @@ import { writeWorkerGenerator } from "../plugin-console"; import type { RuntimeWorker } from "./runtime-worker"; export function createClusterForkWorker( - forkComponentPromise: ClusterFork | Promise, + forkComponentPromise: Promise, zipHash: string, getZip: () => Promise, options: ClusterForkOptions, @@ -36,12 +36,15 @@ export function createClusterForkWorker( runtimeWorker.pid = undefined; }); + const peerLiveness = new PeerLiveness(waitKilled.promise); + const clusterForkResultPromise = forkComponentPromise.then(forkComponent => forkComponent.fork(peerLiveness, options, packageJson, zipHash, getZip)); + clusterForkResultPromise.catch(() => {}); + + const clusterWorkerId = clusterForkResultPromise.then(clusterForkResult => clusterForkResult.clusterWorkerId); + clusterWorkerId.catch(() => {}); + const forkPeer = (async () => { - // need to ensure this happens on next tick to prevent unhandled promise rejection. - // await sleep(0); - const forkComponent = await forkComponentPromise; - const peerLiveness = new PeerLiveness(waitKilled.promise); - const clusterForkResult = await forkComponent.fork(peerLiveness, options, packageJson, zipHash, getZip); + const clusterForkResult = await clusterForkResultPromise; waitKilled.promise.finally(() => { runtimeWorker.pid = undefined; clusterForkResult.kill().catch(() => {}); @@ -83,5 +86,6 @@ export function createClusterForkWorker( return { runtimeWorker, forkPeer, + clusterWorkerId, } } \ No newline at end of file diff --git a/server/src/scrypted-cluster-main.ts b/server/src/scrypted-cluster-main.ts index 8ffee7e691..b2bb0f2ee6 100644 --- a/server/src/scrypted-cluster-main.ts +++ b/server/src/scrypted-cluster-main.ts @@ -61,6 +61,7 @@ export interface ClusterForkOptions { runtime?: ForkOptions['runtime']; labels?: ForkOptions['labels']; id?: ForkOptions['id']; + clusterWorkerId?: ForkOptions['clusterWorkerId']; } type ConnectForkWorker = (auth: ClusterObject, properties: ClusterWorkerProperties) => Promise<{ clusterId: string }>; @@ -85,6 +86,7 @@ export class PeerLiveness { export class ClusterForkResult extends PeerLiveness { [RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] = ['kill']; + clusterWorkerId?: string; constructor(private peer: RpcPeer, killed: Promise, private result: any) { super(killed); diff --git a/server/src/services/cluster-fork.ts b/server/src/services/cluster-fork.ts index ac4afa8821..0f5339813a 100644 --- a/server/src/services/cluster-fork.ts +++ b/server/src/services/cluster-fork.ts @@ -1,16 +1,21 @@ import type { ScryptedRuntime } from "../runtime"; import { matchesClusterLabels } from "../cluster/cluster-labels"; import { ClusterForkOptions, ClusterForkParam, ClusterWorker, PeerLiveness } from "../scrypted-cluster-main"; +import { RpcPeer } from "../rpc"; export class ClusterFork { constructor(public runtime: ScryptedRuntime) { } async fork(peerLiveness: PeerLiveness, options: ClusterForkOptions, packageJson: any, zipHash: string, getZip: () => Promise) { - const matchingWorkers = [...this.runtime.clusterWorkers.values()].map(worker => ({ + const matchingWorkers = [...this.runtime.clusterWorkers.entries()].map(([id, worker]) => ({ worker, matches: matchesClusterLabels(options, worker.labels), })) - .filter(({ matches }) => matches); + .filter(({ matches, worker }) => { + // labels must match + // and worker id must match if provided + return matches && (!options.clusterWorkerId || worker.id === options.clusterWorkerId); + }); matchingWorkers.sort((a, b) => b.worker.labels.length - a.worker.labels.length); let worker: ClusterWorker; @@ -23,8 +28,11 @@ export class ClusterFork { // TODO: round robin? worker ||= matchingWorkers[0]?.worker; - if (!worker) + if (!worker) { + if (options.clusterWorkerId) + throw new Error(`no worker found for cluster id ${options.clusterWorkerId}`); throw new Error(`no worker found for cluster labels ${JSON.stringify(options.labels)}`); + } const fork: ClusterForkParam = await worker.peer.getParam('fork'); const forkResult = await fork(peerLiveness, options.runtime, packageJson, zipHash, getZip); @@ -32,6 +40,8 @@ export class ClusterFork { forkResult.waitKilled().catch(() => { }).finally(() => { worker.forks.delete(options); }); + + forkResult.clusterWorkerId = worker.id; return forkResult; } @@ -39,6 +49,7 @@ export class ClusterFork { const ret: any = {}; for (const worker of this.runtime.clusterWorkers.values()) { ret[worker.id] = { + name: worker.peer.peerName, labels: worker.labels, forks: [...worker.forks], };