-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcode.py
57 lines (42 loc) · 1.53 KB
/
code.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from XRPLib.defaults import drivetrain, servo_one
import wifi
import time
from ws_connection import ClientClosedError
from ws_server import WebSocketServer, WebSocketClient
deadline = time.ticks_add(time.ticks_ms(), 300)
class TestClient(WebSocketClient):
def __init__(self, conn):
super().__init__(conn)
def process(self):
try:
msg = self.connection.read()
if not msg:
return
msg = msg.decode("utf-8")
msg = msg.split("\n")[-2]
msg = msg.split(" ")
deadline = time.ticks_add(time.ticks_ms(), 300)
# tires.apply_power(int(msg[0]), int(msg[1]), int(msg[2]), int(msg[3]))
print("driving", float(msg[0]), float(msg[1]), float(msg[2]))
drivetrain.set_effort(float(msg[0]), float(msg[1]))
servo_one.set_angle(float(msg[2]))
except ClientClosedError:
print("Connection close error")
self.connection.close()
except Exception as e:
print("exception:" + str(e) + "\n")
raise e
class TestServer(WebSocketServer):
def __init__(self):
super().__init__("index.html", 100)
def _make_client(self, conn):
return TestClient(conn)
wifi.run()
server = TestServer()
server.start()
while True:
server.process_all()
if time.ticks_diff(deadline, time.ticks_ms()) < 0:
drivetrain.stop()
deadline = time.ticks_add(time.ticks_ms(), 100000)
server.stop()