-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathopentherm_rp2.py
124 lines (103 loc) · 3.78 KB
/
opentherm_rp2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import machine
import rp2
import time
from lib import manchester_encode, frame_encode, manchester_decode, frame_decode
import asyncio
# opentherm tx - transmit pre-manchester-encoded-bits. Automatically sends start and stop bits.
# note that the hardware is inverted for transmission - HIGH = 0, LOW = 1
@rp2.asm_pio(autopush=True, set_init=rp2.PIO.OUT_HIGH, out_init=rp2.PIO.OUT_HIGH, autopull=True, out_shiftdir=rp2.PIO.SHIFT_LEFT)
def opentherm_tx():
# counter for bit writing loop
set(x, 31)
# write start bit (hardware bits are inverted!)
set(pins, 0)
nop()
set(pins, 1)
nop()
# write the manchester encoded data
label("write_bits")
out(pins, 1)
nop()
out(pins, 1)
jmp(x_dec, "write_bits")
# write stop bit (hardware bits are inverted!)
set(pins, 0)
nop()
set(pins, 1)
nop()
# indicate that we're done!
in_(x, 32)
label("loop")
jmp("loop")
sm_opentherm_tx = rp2.StateMachine(0, opentherm_tx, freq=4000, set_base=machine.Pin(0), out_base=machine.Pin(0)) # each tick is 250uS
# opentherm rx - receive manchester-encoded-bits, waiting for initial start bit
@rp2.asm_pio(autopush=True, in_shiftdir=rp2.PIO.SHIFT_LEFT)
def opentherm_rx():
# wait for start bit
wait(1, pin, 0)
wait(0, pin, 0)
# kick off initial bit read
set(x, 14) # 13 loops, 3 ticks per loop, 60kHz == 650uS
jmp("wait_for_bit_currently_0")
# read the current bit from the GPIO
label("read_next_bit")
in_(pins, 1)
set(x, 14) # 13 loops, 3 ticks per loop, 60kHz == 650uS
jmp(pin, "wait_for_bit_currently_1")
# wait for a bit change from 0->1 or timeout
label("wait_for_bit_currently_0")
nop()
jmp(pin, "read_next_bit")
jmp(x_dec, "wait_for_bit_currently_0")
jmp("read_next_bit")
# wait for a bit change from 1->0 or timeout
label("wait_for_bit_currently_1")
jmp(pin, "was_still_1")
jmp("read_next_bit")
label("was_still_1")
jmp(x_dec, "wait_for_bit_currently_1")
jmp("read_next_bit")
sm_opentherm_rx = rp2.StateMachine(4, opentherm_rx, freq=60000, in_base=machine.Pin(1), jmp_pin=machine.Pin(1)) # each tick is 17uS
async def opentherm_exchange(msg_type: int, data_id: int, data_value: int, timeout_ms: int = 1000, debug: bool=False) -> tuple[int, int, int]:
f = frame_encode(msg_type, data_id, data_value)
m = manchester_encode(f, invert=True)
if debug:
print(f"> {f:08x} {(m >> 48) & 0xffff:016b} {(m >> 32) & 0xffff:016b} {(m >> 16) & 0xffff:016b} {m & 0xffff:016b}")
# setup pio
sm_opentherm_tx.active(0)
sm_opentherm_rx.active(0)
while sm_opentherm_rx.rx_fifo():
sm_opentherm_rx.get()
while sm_opentherm_tx.rx_fifo():
sm_opentherm_tx.get()
# send the data using the transmitter pio
sm_opentherm_tx.put(int(m) >> 32)
sm_opentherm_tx.put(int(m))
sm_opentherm_tx.restart()
sm_opentherm_tx.active(1)
# wait for the pio to finish - don't care about value
while not sm_opentherm_tx.rx_fifo():
await asyncio.sleep_ms(1)
sm_opentherm_tx.get()
sm_opentherm_tx.active(0)
# wait for response
sm_opentherm_rx.restart()
sm_opentherm_rx.active(1)
timeout = time.ticks_ms() + timeout_ms
while sm_opentherm_rx.rx_fifo() < 2 and time.ticks_ms() < timeout:
await asyncio.sleep_ms(10)
sm_opentherm_rx.active(0)
# check we didn't time out
if sm_opentherm_rx.rx_fifo() < 2:
raise Exception("Timeout waiting for response")
# decode it
a = sm_opentherm_rx.get()
b = sm_opentherm_rx.get()
m2 = (a << 32) | b
f2 = 0
try:
f2 = manchester_decode(m2)
finally:
if debug:
print(f"< {f2:08x} {(m2 >> 48) & 0xffff:016b} {(m2 >> 32) & 0xffff:016b} {(m2 >> 16) & 0xffff:016b} {m2 & 0xffff:016b}")
return frame_decode(f2)