-
-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathems.py
145 lines (104 loc) · 4.58 KB
/
ems.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
from __future__ import annotations
import json
import logging
from . import SatMqttCoordinator
from ..coordinator import DeviceState
from ..util import float_value
DATA_ON = "on"
DATA_OFF = "off"
DATA_BOILER_DATA = "boiler_data"
DATA_FLAME_ACTIVE = "burngas"
DATA_DHW_SETPOINT = "dhw/seltemp"
DATA_CONTROL_SETPOINT = "selflowtemp"
DATA_REL_MOD_LEVEL = "curburnpow"
DATA_BOILER_TEMPERATURE = "curflowtemp"
DATA_RETURN_TEMPERATURE = "rettemp"
DATA_DHW_ENABLE = "tapwateractive"
DATA_CENTRAL_HEATING = "heatingactive"
DATA_BOILER_CAPACITY = "nompower"
DATA_REL_MIN_MOD_LEVEL = "burnminnpower"
DATA_MAX_REL_MOD_LEVEL_SETTING = "burnmaxpower"
_LOGGER: logging.Logger = logging.getLogger(__name__)
class SatEmsMqttCoordinator(SatMqttCoordinator):
"""Class to manage fetching data from the OTGW Gateway using MQTT."""
@property
def supports_setpoint_management(self) -> bool:
return True
@property
def supports_hot_water_setpoint_management(self) -> bool:
return True
@property
def supports_maximum_setpoint_management(self) -> bool:
return True
@property
def supports_relative_modulation_management(self) -> bool:
return True
@property
def device_active(self) -> bool:
return bool(self.data.get(DATA_CENTRAL_HEATING))
@property
def flame_active(self) -> bool:
return bool(self.data.get(DATA_FLAME_ACTIVE))
@property
def hot_water_active(self) -> bool:
return bool(self.data.get(DATA_DHW_ENABLE))
@property
def setpoint(self) -> float | None:
return float_value(self.data.get(DATA_CONTROL_SETPOINT))
@property
def hot_water_setpoint(self) -> float | None:
return float_value(self.data.get(DATA_DHW_SETPOINT))
@property
def boiler_temperature(self) -> float | None:
return float_value(self.data.get(DATA_BOILER_TEMPERATURE))
@property
def return_temperature(self) -> float | None:
return float_value(self.data.get(DATA_RETURN_TEMPERATURE))
@property
def relative_modulation_value(self) -> float | None:
return float_value(self.data.get(DATA_REL_MOD_LEVEL))
@property
def boiler_capacity(self) -> float | None:
return float_value(self.data.get(DATA_BOILER_CAPACITY))
@property
def minimum_relative_modulation_value(self) -> float | None:
return float_value(self.data.get(DATA_REL_MIN_MOD_LEVEL))
@property
def maximum_relative_modulation_value(self) -> float | None:
return float_value(self.data.get(DATA_MAX_REL_MOD_LEVEL_SETTING))
@property
def member_id(self) -> int | None:
# Not supported (yet)
return None
async def boot(self) -> SatMqttCoordinator:
# Nothing needs to be booted (yet)
return self
def get_tracked_entities(self) -> list[str]:
return [DATA_BOILER_DATA]
async def async_set_control_setpoint(self, value: float) -> None:
await self._publish_command(f'{{"cmd": "selflowtemp", "value": {0 if value == 10 else value}}}')
await super().async_set_control_setpoint(value)
async def async_set_control_hot_water_setpoint(self, value: float) -> None:
await self._publish_command(f'{{"cmd": "dhw/seltemp", "value": {value}}}')
await super().async_set_control_hot_water_setpoint(value)
async def async_set_control_thermostat_setpoint(self, value: float) -> None:
# Not supported (yet)
await super().async_set_control_thermostat_setpoint(value)
async def async_set_heater_state(self, state: DeviceState) -> None:
await self._publish_command(f'{{"cmd": "heatingoff", "value": "{DATA_OFF if state == DeviceState.ON else DATA_ON}"}}')
await super().async_set_heater_state(state)
async def async_set_control_max_relative_modulation(self, value: int) -> None:
await self._publish_command(f'{{"cmd": "burnmaxpower", "value": {value}}}')
await super().async_set_control_max_relative_modulation(value)
async def async_set_control_max_setpoint(self, value: float) -> None:
await self._publish_command(f'{{"cmd": "heatingtemp", "value": {value}}}')
await super().async_set_control_max_setpoint(value)
def _get_topic_for_subscription(self, key: str) -> str:
return f"{self._topic}/{key}"
def _get_topic_for_publishing(self) -> str:
return f"{self._topic}/boiler"
def _process_message_payload(self, key: str, payload):
try:
self.data = json.loads(payload)
except json.JSONDecodeError as error:
_LOGGER.error("Failed to decode JSON payload: %s. Error: %s", payload, error)