Skip to content

Commit

Permalink
Merge branch 'main' of github.com:koush/scrypted
Browse files Browse the repository at this point in the history
  • Loading branch information
koush committed Feb 3, 2025
2 parents d444c4a + 590f955 commit 5d213a4
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
7 changes: 4 additions & 3 deletions plugins/core/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { MediaCore } from './media-core';
import { checkLegacyLxc, checkLxc } from './platform/lxc';
import { ConsoleServiceNativeId, PluginSocketService, ReplServiceNativeId } from './plugin-socket-service';
import { ScriptCore, ScriptCoreNativeId, newScript } from './script-core';
import { TerminalService, TerminalServiceNativeId } from './terminal-service';
import { TerminalService, TerminalServiceNativeId, newTerminalService } from './terminal-service';
import { UsersCore, UsersNativeId } from './user';
import { ClusterCore, ClusterCoreNativeId } from './cluster';

Expand Down Expand Up @@ -140,7 +140,7 @@ class ScryptedCore extends ScryptedDeviceBase implements HttpRequestHandler, Dev
{
name: 'Terminal Service',
nativeId: TerminalServiceNativeId,
interfaces: [ScryptedInterface.StreamService, ScryptedInterface.TTY],
interfaces: [ScryptedInterface.StreamService, ScryptedInterface.TTY, ScryptedInterface.ClusterForkInterface],
type: ScryptedDeviceType.Builtin,
},
);
Expand Down Expand Up @@ -242,7 +242,7 @@ class ScryptedCore extends ScryptedDeviceBase implements HttpRequestHandler, Dev
if (nativeId === UsersNativeId)
return this.users ||= new UsersCore();
if (nativeId === TerminalServiceNativeId)
return this.terminalService ||= new TerminalService();
return this.terminalService ||= new TerminalService(TerminalServiceNativeId, false);
if (nativeId === ReplServiceNativeId)
return this.replService ||= new PluginSocketService(ReplServiceNativeId, 'repl');
if (nativeId === ConsoleServiceNativeId)
Expand Down Expand Up @@ -331,5 +331,6 @@ export async function fork() {
return {
tsCompile,
newScript,
newTerminalService,
}
}
62 changes: 59 additions & 3 deletions plugins/core/src/terminal-service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import sdk, { ScryptedDeviceBase, ScryptedInterface, ScryptedNativeId, StreamService, TTYSettings } from "@scrypted/sdk";
import sdk, { ClusterForkInterface, ClusterForkInterfaceOptions, ScryptedDeviceBase, ScryptedInterface, ScryptedNativeId, StreamService, TTYSettings } from "@scrypted/sdk";
import type { IPty, spawn as ptySpawn } from 'node-pty';
import { createAsyncQueue } from '@scrypted/common/src/async-queue'
import { ChildProcess, spawn as childSpawn } from "child_process";
Expand Down Expand Up @@ -111,8 +111,11 @@ class NoninteractiveTerminal {
}


export class TerminalService extends ScryptedDeviceBase implements StreamService<Buffer | string, Buffer> {
constructor(nativeId?: ScryptedNativeId) {
export class TerminalService extends ScryptedDeviceBase implements StreamService<Buffer | string, Buffer>, ClusterForkInterface {
private forks: { [clusterWorkerId: string]: TerminalService } = {};
private forkClients: 0;

constructor(nativeId?: ScryptedNativeId, private isFork: boolean = false) {
super(nativeId);
}

Expand All @@ -134,6 +137,42 @@ export class TerminalService extends ScryptedDeviceBase implements StreamService
return extraPaths;
}

async forkInterface<StreamService>(forkInterface: ScryptedInterface, options?: ClusterForkInterfaceOptions): Promise<StreamService> {
if (forkInterface !== ScryptedInterface.StreamService) {
throw new Error('can only fork StreamService');
}

if (!options?.clusterWorkerId) {
throw new Error('clusterWorkerId required');
}

if (this.isFork) {
throw new Error('cannot fork a fork');
}

const clusterWorkerId = options.clusterWorkerId;
if (this.forks[clusterWorkerId]) {
return this.forks[clusterWorkerId] as StreamService;
}

const fork = sdk.fork<{
newTerminalService: typeof newTerminalService,
}>({ clusterWorkerId });
try {
const result = await fork.result;
const terminalService = await result.newTerminalService();
this.forks[clusterWorkerId] = terminalService;
fork.worker.on('exit', () => {
delete this.forks[clusterWorkerId];
});
return terminalService as StreamService;
}
catch (e) {
fork.worker.terminate();
throw e;
}
}

/*
* The input to this stream can send buffers for normal terminal data and strings
* for control messages. Control messages are JSON-formatted.
Expand All @@ -149,6 +188,19 @@ export class TerminalService extends ScryptedDeviceBase implements StreamService
const queue = createAsyncQueue<Buffer>();
const extraPaths = await this.getExtraPaths();

if (this.isFork) {
this.forkClients++;
}

queue.endPromise.then(() => {
if (this.isFork) {
this.forkClients--;
if (this.forkClients === 0) {
process.exit();
}
}
});

function registerChildListeners() {
cp.onExit(() => queue.end());

Expand Down Expand Up @@ -232,4 +284,8 @@ export class TerminalService extends ScryptedDeviceBase implements StreamService

return generator();
}
}

export async function newTerminalService(): Promise<TerminalService> {
return new TerminalService(TerminalServiceNativeId, true);
}

0 comments on commit 5d213a4

Please sign in to comment.