-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathheartbeat.py
62 lines (50 loc) · 1.62 KB
/
heartbeat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# The MIT License (MIT)
# Copyright (c) 2023 Edgaras Janušauskas and Inovatorius MB (www.fildz.com)
################################################################################
# FILDZ CYBEROS HEARTBEAT
#
# Send periodic ping to paired cyberware.
# TODO:
# 1. Create task to send pings.
# 2. Once pong received, update the web interface.
import uasyncio as asyncio
from uasyncio import Event
import fildz_cyberos as cyberos
class Heartbeat:
def __init__(self):
self._on_ping = Event()
self._on_pong = Event()
asyncio.create_task(self._event_ping())
asyncio.create_task(self._event_pong())
# Events.
asyncio.create_task(self._push())
################################################################################
# Events
#
@property
def on_ping(self):
return self._on_ping
@property
def on_pong(self):
return self._on_pong
################################################################################
# Tasks
#
# Received a ping.
async def _event_ping(self):
while True:
await self._on_ping.wait()
self._on_ping.clear()
# print('Ping from', cyberos.event.sender)
# Received a pong.
async def _event_pong(self):
while True:
await self._on_pong.wait()
self._on_pong.clear()
# print('Pong from', cyberos.event.sender)
async def _push(self):
cyberos.cyberwares[cyberos.network.ap_ssid]['events'].update(
{
'on_ping': self._on_ping,
'on_pong': self._on_pong
})