-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandlers.py
53 lines (50 loc) · 1.93 KB
/
handlers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import logging
import time
import json
from websockets.asyncio.server import broadcast
from client import get_udp_transport
from storage import Signal, storage, WS_CONNECTIONS
from serializers import serialize_collision_data
async def _send_data(value: float, side: str):
if not storage.allow_signal_sending:
return
value = min(value, storage.max_intensity)
if side == 'left':
prev_lsignal = storage.lsignal
if prev_lsignal and time.time() - prev_lsignal.sent_at_ts < 0.2:
return
storage.lsignal = Signal(value)
elif side == 'right':
prev_rsignal = storage.rsignal
if prev_rsignal and time.time() - prev_rsignal.sent_at_ts < 0.2:
return
storage.rsignal = Signal(value)
else:
raise Exception('Invalid side codename')
transport = await get_udp_transport()
lvalue = storage.lsignal.value if storage.lsignal and not storage.lsignal.sent_at_ts else 0.0
rvalue = storage.rsignal.value if storage.rsignal and not storage.rsignal.sent_at_ts else 0.0
if transport:
data = serialize_collision_data(lvalue, rvalue)
logging.debug(f"[OSC Handler] Send pat_{side} value: {value}")
logging.debug(f"[OSC Handler] Send data: {data}")
transport.sendto(data)
now = time.time()
if storage.lsignal and storage.lsignal.sent_at_ts == 0.0:
storage.lsignal.sent_at_ts = now
if storage.rsignal and storage.rsignal.sent_at_ts == 0.0:
storage.rsignal.sent_at_ts = now
else:
logging.debug(f"[OSC Handler] Cannot connect to device to send data")
data = {
"type": "signal",
"value": {
"left": lvalue,
"right": rvalue,
}
}
broadcast(WS_CONNECTIONS, json.dumps(data))
async def pat_left_handler(_, value: float):
await _send_data(value, 'left')
async def pat_right_handler(_, value: float):
await _send_data(value, 'right')