-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathinverter_profiles.py
425 lines (368 loc) · 16.2 KB
/
inverter_profiles.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
"""Defines the different inverter models and connection types"""
import functools
import logging
import re
from dataclasses import dataclass
from typing import Any
from typing import ClassVar
from homeassistant.helpers.entity import Entity
from .common.entity_controller import EntityController
from .common.types import ConnectionType
from .common.types import Inv
from .common.types import InverterModel
from .common.types import RegisterType
from .const import INVERTER_BASE
from .const import INVERTER_CONN
from .const import INVERTER_VERSION
from .entities.charge_period_descriptions import CHARGE_PERIODS
from .entities.entity_descriptions import ENTITIES
from .entities.modbus_charge_period_config import ModbusChargePeriodInfo
from .entities.modbus_remote_control_config import ModbusRemoteControlAddressConfig
from .entities.remote_control_description import REMOTE_CONTROL_DESCRIPTION
_LOGGER = logging.getLogger(__package__)
@functools.total_ordering
class Version:
def __init__(self, major: int, minor: int) -> None:
self.major = major
self.minor = minor
@staticmethod
def parse(version: str) -> "Version":
match = re.fullmatch(r"(\d+)\.(\d+)", version)
if match is None:
raise ValueError(f"Version {version} is not a valid version")
return Version(int(match[1]), int(match[2]))
def __eq__(self, other: object) -> bool:
return isinstance(other, Version) and self.major == other.major and self.minor == other.minor
def __hash__(self) -> int:
return hash((self.major, self.minor))
def __lt__(self, other: Any) -> bool:
# None means "the latest", and so sorts higher than anything (except None)
if other is None:
return True
if self.major != other.major:
return self.major < other.major
return self.minor < other.minor
def __str__(self) -> str:
return f"{self.major}.{self.minor}"
def __repr__(self) -> str:
return f"Version({self.major}, {self.minor})"
class SpecialRegisterConfig:
def __init__(
self,
*,
invalid_register_ranges: list[tuple[int, int]] | None = None,
individual_read_register_ranges: list[tuple[int, int]] | None = None,
) -> None:
if invalid_register_ranges is None:
invalid_register_ranges = []
self.invalid_register_ranges = invalid_register_ranges
if individual_read_register_ranges is None:
individual_read_register_ranges = []
self.individual_read_register_ranges = individual_read_register_ranges
H1_AC1_REGISTERS = SpecialRegisterConfig(invalid_register_ranges=[(11096, 39999)])
# See https://github.com/nathanmarlor/foxess_modbus/discussions/503
H3_REGISTERS = SpecialRegisterConfig(
invalid_register_ranges=[(41001, 41006), (41012, 41013), (41015, 41015)],
individual_read_register_ranges=[(41000, 41999)],
)
# See https://github.com/nathanmarlor/foxess_modbus/pull/512
KH_REGISTERS = SpecialRegisterConfig(
invalid_register_ranges=[(41001, 41006), (41012, 41012), (41019, 43999), (31045, 31999)],
individual_read_register_ranges=[(41000, 41999)],
)
# See https://github.com/nathanmarlor/foxess_modbus/discussions/553
H1_G2_REGISTERS = SpecialRegisterConfig(
individual_read_register_ranges=[(41000, 41999)],
)
@dataclass(kw_only=True)
class CapacityParser:
capacity_map: dict[str, int] | None
fallback_to_kw: bool
DEFAULT: ClassVar["CapacityParser"]
H1: ClassVar["CapacityParser"]
def parse(self, capacity_str: str, inverter_model: str) -> int:
if self.capacity_map is not None:
capacity = self.capacity_map.get(capacity_str)
if capacity is not None:
return capacity
if not self.fallback_to_kw:
raise Exception(f"Unknown capacity '{capacity_str}' for inverter model '{inverter_model}'")
try:
capacity = int(float(capacity_str) * 1000)
return capacity
except ValueError as ex:
raise Exception(f"Unable parse capacity '{capacity_str}' of inverter '{inverter_model}'") from ex
CapacityParser.DEFAULT = CapacityParser(capacity_map=None, fallback_to_kw=True)
CapacityParser.H1 = CapacityParser(capacity_map={"3.7": 3680}, fallback_to_kw=True)
class InverterModelConnectionTypeProfile:
"""Describes the capabilities of an inverter when connected to over a particular interface"""
def __init__(
self,
inverter_model_profile: "InverterModelProfile",
connection_type: ConnectionType,
register_type: RegisterType,
versions: dict[Version | None, Inv],
special_registers: SpecialRegisterConfig,
) -> None:
self.inverter_model_profile = inverter_model_profile
self.connection_type = connection_type
self.register_type = register_type
self.versions = versions
self.special_registers = special_registers
assert None in versions
def _get_inv(self, controller: EntityController) -> Inv:
version_from_config = controller.inverter_details.get(INVERTER_VERSION)
# Remember that self._versions is a map of maximum supported manager version (or None to support the max
# firmware version) -> Inv for that version
if version_from_config is None:
return self.versions[None]
inverter_version = Version.parse(version_from_config)
versions = sorted(self.versions.items(), reverse=True)
matched_version = next((x for x in versions if x[0] <= inverter_version), versions[0])
return matched_version[1]
def overlaps_invalid_range(self, start_address: int, end_address: int) -> bool:
"""Determines whether the given inclusive address range overlaps any invalid address ranges"""
return any(
r[0] <= end_address and start_address <= r[1] for r in self.special_registers.invalid_register_ranges
)
def is_individual_read(self, address: int) -> bool:
return any(r[0] <= address <= r[1] for r in self.special_registers.individual_read_register_ranges)
def create_entities(
self,
entity_type: type[Entity],
controller: EntityController,
) -> list[Entity]:
"""Create all of the entities of the given type which support this inverter/connection combination"""
result = []
for entity_factory in ENTITIES:
if entity_factory.entity_type == entity_type:
entity = entity_factory.create_entity_if_supported(
controller, self._get_inv(controller), self.register_type
)
if entity is not None:
result.append(entity)
return result
def create_charge_periods(self, controller: EntityController) -> list[ModbusChargePeriodInfo]:
"""Create all of the charge periods which support this inverter/connection combination"""
result = []
for charge_period_factory in CHARGE_PERIODS:
charge_period = charge_period_factory.create_charge_period_config_if_supported(
controller, self._get_inv(controller), self.register_type
)
if charge_period is not None:
result.append(charge_period)
return result
def create_remote_control_config(self, controller: EntityController) -> ModbusRemoteControlAddressConfig | None:
return REMOTE_CONTROL_DESCRIPTION.create_if_supported(controller, self._get_inv(controller), self.register_type)
class InverterModelProfile:
"""Describes the capabilities of an inverter model"""
def __init__(self, model: InverterModel, model_pattern: str, capacity_parser: CapacityParser | None = None) -> None:
self.model = model
self.model_pattern = model_pattern
self._capacity_parser = capacity_parser if capacity_parser is not None else CapacityParser.DEFAULT
self.connection_types: dict[ConnectionType, InverterModelConnectionTypeProfile] = {}
def add_connection_type(
self,
connection_type: ConnectionType,
register_type: RegisterType,
versions: dict[Version | None, Inv], # Map of maximum supported manager versions -> Inv for that
special_registers: SpecialRegisterConfig | None = None,
) -> "InverterModelProfile":
"""Add the given connection type to the profile"""
assert connection_type not in self.connection_types
if special_registers is None:
special_registers = SpecialRegisterConfig()
self.connection_types[connection_type] = InverterModelConnectionTypeProfile(
self,
connection_type,
register_type,
versions,
special_registers,
)
return self
def inverter_capacity(self, inverter_model: str) -> int:
match = re.match(self.model_pattern, inverter_model)
if match is None:
raise Exception(f"Unable to determine capacity of inverter '{inverter_model}'")
capacity = self._capacity_parser.parse(match.group(1), inverter_model)
return capacity
INVERTER_PROFILES = {
x.model: x
for x in [
# E.g. H1-5.0-E-G2. Has to appear before H1_G1.
InverterModelProfile(
InverterModel.H1_G2, r"^H1-([\d\.]+)-E-G2", capacity_parser=CapacityParser.H1
).add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={Version(1, 44): Inv.H1_G2_PRE144, None: Inv.H1_G2_144},
special_registers=H1_G2_REGISTERS,
),
# Can be both e.g. H1-5.0 and H1-5.0-E, but not H1-5.0-E-G2
InverterModelProfile(InverterModel.H1_G1, r"^H1-([\d\.]+)", capacity_parser=CapacityParser.H1)
.add_connection_type(
ConnectionType.AUX,
RegisterType.INPUT,
versions={None: Inv.H1_G1},
special_registers=H1_AC1_REGISTERS,
)
.add_connection_type(
ConnectionType.LAN,
RegisterType.HOLDING,
versions={None: Inv.H1_LAN},
),
# AC1-5.0-E-G2. Has to appear before AC1 G1 see https://github.com/nathanmarlor/foxess_modbus/discussions/715
InverterModelProfile(
InverterModel.AC1, r"^AC1-([\d\.]+)-E-G2", capacity_parser=CapacityParser.H1
).add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={Version(1, 44): Inv.H1_G2_PRE144, None: Inv.H1_G2_144},
special_registers=H1_G2_REGISTERS,
),
InverterModelProfile(InverterModel.AC1, r"^AC1-([\d\.]+)", capacity_parser=CapacityParser.H1)
.add_connection_type(
ConnectionType.AUX,
RegisterType.INPUT,
versions={None: Inv.H1_G1},
special_registers=H1_AC1_REGISTERS,
)
.add_connection_type(
ConnectionType.LAN,
RegisterType.HOLDING,
versions={None: Inv.H1_LAN},
),
InverterModelProfile(InverterModel.AIO_H1, r"^AIO-H1-([\d\.]+)", capacity_parser=CapacityParser.H1)
.add_connection_type(
ConnectionType.AUX,
RegisterType.INPUT,
versions={None: Inv.H1_G1},
special_registers=H1_AC1_REGISTERS,
)
.add_connection_type(
ConnectionType.LAN,
RegisterType.HOLDING,
versions={None: Inv.H1_LAN},
),
InverterModelProfile(
InverterModel.AIO_AC1, r"^AIO-AC1-([\d\.]+)", capacity_parser=CapacityParser.H1
).add_connection_type(
ConnectionType.AUX,
RegisterType.INPUT,
versions={None: Inv.H1_G1},
special_registers=H1_AC1_REGISTERS,
),
# The KH doesn't have a LAN port. It supports both input and holding over RS485
# Some models start with KH-, but some are just e.g. KH10.5
InverterModelProfile(InverterModel.KH, r"^KH([\d\.]+)").add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={Version(1, 19): Inv.KH_PRE119, Version(1, 33): Inv.KH_PRE133, None: Inv.KH_133},
special_registers=KH_REGISTERS,
),
# The H3 seems to use holding registers for everything
InverterModelProfile(InverterModel.H3, r"^H3-([\d\.]+)")
.add_connection_type(
ConnectionType.LAN,
RegisterType.HOLDING,
versions={None: Inv.H3},
special_registers=H3_REGISTERS,
)
.add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={None: Inv.H3},
special_registers=H3_REGISTERS,
),
InverterModelProfile(InverterModel.AC3, r"^AC3-([\d\.]+)")
.add_connection_type(
ConnectionType.LAN,
RegisterType.HOLDING,
versions={None: Inv.H3},
special_registers=H3_REGISTERS,
)
.add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={None: Inv.H3},
special_registers=H3_REGISTERS,
),
InverterModelProfile(InverterModel.AIO_H3, r"^AIO-H3-([\d\.]+)")
.add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={None: Inv.AIO_H3},
special_registers=H3_REGISTERS,
)
.add_connection_type(
ConnectionType.LAN,
RegisterType.HOLDING,
versions={None: Inv.AIO_H3},
special_registers=H3_REGISTERS,
),
# Kuara 6.0-3-H: H3-6.0-E
# Kuara 8.0-3-H: H3-8.0-E
# Kuara 10.0-3-H: H3-10.0-E
# Kuara 12.0-3-H: H3-12.0-E
# I haven't seen any indication that these support a direct LAN connection
InverterModelProfile(InverterModel.KUARA_H3, r"^Kuara ([\d\.]+)-3-H$").add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={None: Inv.KUARA_H3},
special_registers=H3_REGISTERS,
),
# Sonnenkraft:
# SK-HWR-8: H3-8.0-E
# (presumably there are other sizes also)
InverterModelProfile(InverterModel.SK_HWR, r"^SK-HWR-([\d\.]+)").add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={None: Inv.H3},
special_registers=H3_REGISTERS,
),
# STAR
# STAR-H3-12.0-E: H3-12.0-E
# (presumably there are other sizes also)
InverterModelProfile(InverterModel.STAR_H3, r"^STAR-H3-([\d\.]+)").add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={None: Inv.H3},
special_registers=H3_REGISTERS,
),
# Solavita SP
# These have the form 'SP R8KH3', 'R10KH3', 'R12KH3', but the number doesn't map to a power
# https://www.svcenergy.com/product/three-phase-solar-power-hybrid-inverter-sih
InverterModelProfile(
InverterModel.SOLAVITA_SP,
r"^SP R(\d+)KH3",
capacity_parser=CapacityParser(
capacity_map={
"8": 10400,
"10": 13000,
"12": 15600,
},
fallback_to_kw=False,
),
).add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={None: Inv.H3},
special_registers=H3_REGISTERS,
),
# E.g. H3-Pro-20.0
InverterModelProfile(InverterModel.H3_PRO, r"^H3-Pro-([\d\.]+)").add_connection_type(
ConnectionType.AUX,
RegisterType.HOLDING,
versions={None: Inv.H3_PRO},
special_registers=H3_REGISTERS,
),
]
}
def create_entities(entity_type: type[Entity], controller: EntityController) -> list[Entity]:
"""Create all of the entities which support the inverter described by the given configuration object"""
return inverter_connection_type_profile_from_config(controller.inverter_details).create_entities(
entity_type, controller
)
def inverter_connection_type_profile_from_config(inverter_config: dict[str, Any]) -> InverterModelConnectionTypeProfile:
"""Fetches a InverterConnectionTypeProfile for a given configuration object"""
return INVERTER_PROFILES[inverter_config[INVERTER_BASE]].connection_types[inverter_config[INVERTER_CONN]]