Skip to content

Commit

Permalink
server: start watchdog/stats after plugin dependency installation com…
Browse files Browse the repository at this point in the history
…pletes
  • Loading branch information
koush committed Mar 25, 2023
1 parent d6560fb commit 99995ea
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 24 deletions.
4 changes: 2 additions & 2 deletions server/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 14 additions & 16 deletions server/python/plugin_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ class PluginRemote:
consoles: Mapping[str, Future[Tuple[StreamReader, StreamWriter]]] = {}

def __init__(self, peer: rpc.RpcPeer, api, pluginId, hostInfo, loop: AbstractEventLoop):
self.allMemoryStats = {}
self.peer = peer
self.api = api
self.pluginId = pluginId
Expand Down Expand Up @@ -446,6 +447,8 @@ def onProxySerialization(value: Any, proxyId: str):
self.deviceManager = DeviceManager(self.nativeIds, self.systemManager)
self.mediaManager = MediaManager(await self.api.getMediaManager())

await self.start_stats_runner()

try:
from scrypted_sdk import sdk_init2 # type: ignore

Expand Down Expand Up @@ -479,7 +482,7 @@ async def getFork():
forkPeer.peerName = 'thread'

async def updateStats(stats):
allMemoryStats[forkPeer] = stats
self.allMemoryStats[forkPeer] = stats
forkPeer.params['updateStats'] = updateStats

async def forkReadLoop():
Expand All @@ -489,7 +492,7 @@ async def forkReadLoop():
# traceback.print_exc()
print('fork read loop exited')
finally:
allMemoryStats.pop(forkPeer)
self.allMemoryStats.pop(forkPeer)
parent_conn.close()
rpcTransport.executor.shutdown()
asyncio.run_coroutine_threadsafe(forkReadLoop(), loop=self.loop)
Expand Down Expand Up @@ -580,16 +583,8 @@ async def createDeviceState(self, id, setState):
async def getServicePort(self, name):
pass


allMemoryStats = {}

async def plugin_async_main(loop: AbstractEventLoop, rpcTransport: rpc_reader.RpcTransport):
peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, rpcTransport)
peer.params['print'] = print
peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(peer, api, pluginId, hostInfo, loop)

async def get_update_stats():
update_stats = await peer.getParam('updateStats')
async def start_stats_runner(self):
update_stats = await self.peer.getParam('updateStats')
if not update_stats:
print('host did not provide update_stats')
return
Expand All @@ -608,7 +603,7 @@ def stats_runner():
except:
heapTotal = 0

for _, stats in allMemoryStats.items():
for _, stats in self.allMemoryStats.items():
ptime += stats['cpu']['user']
heapTotal += stats['memoryUsage']['heapTotal']

Expand All @@ -621,12 +616,15 @@ def stats_runner():
'heapTotal': heapTotal,
},
}
asyncio.run_coroutine_threadsafe(update_stats(stats), loop)
loop.call_later(10, stats_runner)
asyncio.run_coroutine_threadsafe(update_stats(stats), self.loop)
self.loop.call_later(10, stats_runner)

stats_runner()

asyncio.run_coroutine_threadsafe(get_update_stats(), loop)
async def plugin_async_main(loop: AbstractEventLoop, rpcTransport: rpc_reader.RpcTransport):
peer, readLoop = await rpc_reader.prepare_peer_readloop(loop, rpcTransport)
peer.params['print'] = print
peer.params['getRemote'] = lambda api, pluginId, hostInfo: PluginRemote(peer, api, pluginId, hostInfo, loop)

try:
await readLoop()
Expand Down
12 changes: 6 additions & 6 deletions server/src/plugin/plugin-remote-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe

const { getDeviceConsole, getMixinConsole } = prepareConsoles(() => peer.selfName, () => systemManager, () => deviceManager, getPlugins);

// process.cpuUsage is for the entire process.
// process.memoryUsage is per thread.
const allMemoryStats = new Map<NodeThreadWorker, NodeJS.MemoryUsage>();

peer.getParam('updateStats').then(updateStats => startStatsUpdater(allMemoryStats, updateStats));

let replPort: Promise<number>;

let _pluginConsole: Console;
Expand Down Expand Up @@ -240,6 +234,12 @@ export function startPluginRemote(mainFilename: string, pluginId: string, peerSe

await installOptionalDependencies(getPluginConsole(), packageJson);

// process.cpuUsage is for the entire process.
// process.memoryUsage is per thread.
const allMemoryStats = new Map<NodeThreadWorker, NodeJS.MemoryUsage>();
// start the stats updater/watchdog after installation has finished, as that may take some time.
peer.getParam('updateStats').then(updateStats => startStatsUpdater(allMemoryStats, updateStats));

const main = pluginReader('main.nodejs.js');
pluginReader = undefined;
const script = main.toString();
Expand Down

0 comments on commit 99995ea

Please sign in to comment.