Skip to content

Commit

Permalink
server: fork by clusterWorkerId
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Nov 19, 2024
1 parent c1d48e1 commit 23167da
Show file tree
Hide file tree
Showing 14 changed files with 59 additions and 36 deletions.
4 changes: 2 additions & 2 deletions sdk/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/sdk",
"version": "0.3.73",
"version": "0.3.74",
"description": "",
"main": "dist/src/index.js",
"exports": {
Expand Down
4 changes: 2 additions & 2 deletions sdk/types/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/types/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@scrypted/types",
"version": "0.3.67",
"version": "0.3.68",
"description": "",
"main": "dist/index.js",
"author": "",
Expand Down
8 changes: 4 additions & 4 deletions sdk/types/src/types.input.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2544,9 +2544,9 @@ export interface ForkWorker {
}
export interface PluginFork<T> {
/**
* The cluster worker id that is executing this fork when in cluster mode.
* The id of the cluster worker that is executing this fork when in cluster mode.
*/
workerId?: Promise<string>;
clusterWorkerId?: Promise<string>;
result: Promise<T>;
worker: ForkWorker;
}
Expand Down Expand Up @@ -2635,9 +2635,9 @@ export interface ForkOptions {
nativeId?: ScryptedNativeId;

/**
* The worker id that will execute this fork.
* The id of the cluster worker id that will execute this fork.
*/
workerId?: string;
clusterWorkerId?: string;
/**
* The labels used to select the cluster worker that will execute this fork.
*/
Expand Down
4 changes: 2 additions & 2 deletions server/.vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@
"${workspaceFolder}/**/*.js"
],
"env": {
// "SCRYPTED_CLUSTER_LABELS": "compute",
"SCRYPTED_CLUSTER_LABELS": "compute",
"SCRYPTED_CLUSTER_MODE": "client",
"SCRYPTED_CLUSTER_SERVER": "scrypted-nvr",
"SCRYPTED_CLUSTER_SERVER": "192.168.2.124",
"SCRYPTED_CLUSTER_SECRET": "swordfish",
"SCRYPTED_CAN_RESTART": "true",
"SCRYPTED_VOLUME": "/Users/koush/.scrypted-cluster/volume-client",
Expand Down
22 changes: 11 additions & 11 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"dependencies": {
"@scrypted/ffmpeg-static": "^6.1.0-build3",
"@scrypted/node-pty": "^1.0.22",
"@scrypted/types": "^0.3.66",
"@scrypted/types": "^0.3.68",
"adm-zip": "^0.5.16",
"body-parser": "^1.20.3",
"cookie-parser": "^1.4.7",
Expand Down
3 changes: 2 additions & 1 deletion server/src/cluster/cluster-labels.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,6 @@ export function getClusterLabels() {
}

export function needsClusterForkWorker(options: ClusterForkOptions) {
return process.env.SCRYPTED_CLUSTER_ADDRESS && options?.runtime && !matchesClusterLabels(options, getClusterLabels())
return process.env.SCRYPTED_CLUSTER_ADDRESS
&& (!matchesClusterLabels(options, getClusterLabels()) || options.clusterWorkerId);
}
5 changes: 4 additions & 1 deletion server/src/plugin/plugin-host.ts
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ export class PluginHost {
});

const clusterSetup = setupCluster(this.peer);
const { runtimeWorker, forkPeer } = createClusterForkWorker((async () => {
const { runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker((async () => {
await clusterSetup.initializeCluster({
clusterId: this.scrypted.clusterId,
clusterSecret: this.scrypted.clusterSecret,
Expand All @@ -395,6 +395,9 @@ export class PluginHost {
this.peer = peer;
peer.killedSafe.finally(() => originalPeer.kill());
}).catch(() => {});
clusterWorkerId.then(clusterWorkerId => {
console.log('cluster worker id', clusterWorkerId);
}).catch(() => {});

this.worker = runtimeWorker;
peer = forkPeer;
Expand Down
4 changes: 3 additions & 1 deletion server/src/plugin/plugin-remote-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,11 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
let forkPeer: Promise<RpcPeer>;
let runtimeWorker: RuntimeWorker;
let nativeWorker: child_process.ChildProcess | worker_threads.Worker;
let clusterWorkerId: Promise<string>;

// if running in a cluster, fork to a matching cluster worker only if necessary.
if (needsClusterForkWorker(options)) {
({ runtimeWorker, forkPeer } = createClusterForkWorker(
({ runtimeWorker, forkPeer, clusterWorkerId } = createClusterForkWorker(
api.getComponent('cluster-fork'), zipHash, () => zipAPI.getZip(), options, packageJson, scrypted.connectRPCObject)
);
}
Expand Down Expand Up @@ -337,6 +338,7 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe
nativeWorker,
};
return {
clusterWorkerId,
worker,
result,
};
Expand Down
16 changes: 10 additions & 6 deletions server/src/plugin/runtime/cluster-fork-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { writeWorkerGenerator } from "../plugin-console";
import type { RuntimeWorker } from "./runtime-worker";

export function createClusterForkWorker(
forkComponentPromise: ClusterFork | Promise<ClusterFork>,
forkComponentPromise: Promise<ClusterFork>,
zipHash: string,
getZip: () => Promise<Buffer>,
options: ClusterForkOptions,
Expand Down Expand Up @@ -36,12 +36,15 @@ export function createClusterForkWorker(
runtimeWorker.pid = undefined;
});

const peerLiveness = new PeerLiveness(waitKilled.promise);
const clusterForkResultPromise = forkComponentPromise.then(forkComponent => forkComponent.fork(peerLiveness, options, packageJson, zipHash, getZip));
clusterForkResultPromise.catch(() => {});

const clusterWorkerId = clusterForkResultPromise.then(clusterForkResult => clusterForkResult.clusterWorkerId);
clusterWorkerId.catch(() => {});

const forkPeer = (async () => {
// need to ensure this happens on next tick to prevent unhandled promise rejection.
// await sleep(0);
const forkComponent = await forkComponentPromise;
const peerLiveness = new PeerLiveness(waitKilled.promise);
const clusterForkResult = await forkComponent.fork(peerLiveness, options, packageJson, zipHash, getZip);
const clusterForkResult = await clusterForkResultPromise;
waitKilled.promise.finally(() => {
runtimeWorker.pid = undefined;
clusterForkResult.kill().catch(() => {});
Expand Down Expand Up @@ -83,5 +86,6 @@ export function createClusterForkWorker(
return {
runtimeWorker,
forkPeer,
clusterWorkerId,
}
}
2 changes: 2 additions & 0 deletions server/src/scrypted-cluster-main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export interface ClusterForkOptions {
runtime?: ForkOptions['runtime'];
labels?: ForkOptions['labels'];
id?: ForkOptions['id'];
clusterWorkerId?: ForkOptions['clusterWorkerId'];
}

type ConnectForkWorker = (auth: ClusterObject, properties: ClusterWorkerProperties) => Promise<{ clusterId: string }>;
Expand All @@ -85,6 +86,7 @@ export class PeerLiveness {

export class ClusterForkResult extends PeerLiveness {
[RpcPeer.PROPERTY_PROXY_ONEWAY_METHODS] = ['kill'];
clusterWorkerId?: string;

constructor(private peer: RpcPeer, killed: Promise<any>, private result: any) {
super(killed);
Expand Down
17 changes: 14 additions & 3 deletions server/src/services/cluster-fork.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
import type { ScryptedRuntime } from "../runtime";
import { matchesClusterLabels } from "../cluster/cluster-labels";
import { ClusterForkOptions, ClusterForkParam, ClusterWorker, PeerLiveness } from "../scrypted-cluster-main";
import { RpcPeer } from "../rpc";

export class ClusterFork {
constructor(public runtime: ScryptedRuntime) { }

async fork(peerLiveness: PeerLiveness, options: ClusterForkOptions, packageJson: any, zipHash: string, getZip: () => Promise<Buffer>) {
const matchingWorkers = [...this.runtime.clusterWorkers.values()].map(worker => ({
const matchingWorkers = [...this.runtime.clusterWorkers.entries()].map(([id, worker]) => ({
worker,
matches: matchesClusterLabels(options, worker.labels),
}))
.filter(({ matches }) => matches);
.filter(({ matches, worker }) => {
// labels must match
// and worker id must match if provided
return matches && (!options.clusterWorkerId || worker.id === options.clusterWorkerId);
});
matchingWorkers.sort((a, b) => b.worker.labels.length - a.worker.labels.length);

let worker: ClusterWorker;
Expand All @@ -23,22 +28,28 @@ export class ClusterFork {
// TODO: round robin?
worker ||= matchingWorkers[0]?.worker;

if (!worker)
if (!worker) {
if (options.clusterWorkerId)
throw new Error(`no worker found for cluster id ${options.clusterWorkerId}`);
throw new Error(`no worker found for cluster labels ${JSON.stringify(options.labels)}`);
}

const fork: ClusterForkParam = await worker.peer.getParam('fork');
const forkResult = await fork(peerLiveness, options.runtime, packageJson, zipHash, getZip);
worker.forks.add(options);
forkResult.waitKilled().catch(() => { }).finally(() => {
worker.forks.delete(options);
});

forkResult.clusterWorkerId = worker.id;
return forkResult;
}

async getClusterWorkers() {
const ret: any = {};
for (const worker of this.runtime.clusterWorkers.values()) {
ret[worker.id] = {
name: worker.peer.peerName,
labels: worker.labels,
forks: [...worker.forks],
};
Expand Down

0 comments on commit 23167da

Please sign in to comment.