-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathhttpserver_websocket.py
112 lines (79 loc) · 2.92 KB
/
httpserver_websocket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# SPDX-FileCopyrightText: 2023 Michał Pokusa
#
# SPDX-License-Identifier: Unlicense
from asyncio import create_task, gather, run, sleep as async_sleep
import board
import microcontroller
import neopixel
import socketpool
import wifi
from adafruit_httpserver import Server, Request, Response, Websocket, GET
pool = socketpool.SocketPool(wifi.radio)
server = Server(pool, debug=True)
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1)
websocket: Websocket = None
HTML_TEMPLATE = """
<html lang="en">
<head>
<title>Websocket Client</title>
</head>
<body>
<p>CPU temperature: <strong>-</strong>°C</p>
<p>NeoPixel Color: <input type="color"></p>
<script>
const cpuTemp = document.querySelector('strong');
const colorPicker = document.querySelector('input[type="color"]');
let ws = new WebSocket('ws://' + location.host + '/connect-websocket');
ws.onopen = () => console.log('WebSocket connection opened');
ws.onclose = () => console.log('WebSocket connection closed');
ws.onmessage = event => cpuTemp.textContent = event.data;
ws.onerror = error => cpuTemp.textContent = error;
colorPicker.oninput = debounce(() => ws.send(colorPicker.value), 200);
function debounce(callback, delay = 1000) {
let timeout
return (...args) => {
clearTimeout(timeout)
timeout = setTimeout(() => {
callback(...args)
}, delay)
}
}
</script>
</body>
</html>
"""
@server.route("/client", GET)
def client(request: Request):
return Response(request, HTML_TEMPLATE, content_type="text/html")
@server.route("/connect-websocket", GET)
def connect_client(request: Request):
global websocket # pylint: disable=global-statement
if websocket is not None:
websocket.close() # Close any existing connection
websocket = Websocket(request)
return websocket
server.start(str(wifi.radio.ipv4_address))
async def handle_http_requests():
while True:
server.poll()
await async_sleep(0)
async def handle_websocket_requests():
while True:
if websocket is not None:
if (data := websocket.receive(fail_silently=True)) is not None:
r, g, b = int(data[1:3], 16), int(data[3:5], 16), int(data[5:7], 16)
pixel.fill((r, g, b))
await async_sleep(0)
async def send_websocket_messages():
while True:
if websocket is not None:
cpu_temp = round(microcontroller.cpu.temperature, 2)
websocket.send_message(str(cpu_temp), fail_silently=True)
await async_sleep(1)
async def main():
await gather(
create_task(handle_http_requests()),
create_task(handle_websocket_requests()),
create_task(send_websocket_messages()),
)
run(main())