diff --git a/Classes/OTA.py b/Classes/OTA.py index 7c49f746b..98daf390f 100644 --- a/Classes/OTA.py +++ b/Classes/OTA.py @@ -75,6 +75,9 @@ "Schneider": {"Folder": "SCHNEIDER-WISER", "ManufCode": 0x105E, "ManufName": "Schneider Electric", "Enabled": True}, "SonOff": {"Folder": "SONOFF", "ManufCode": 0x1286, "ManufName": "Sonoff", "Enabled": True}, "Xiaomi": {"Folder": "XIAOMI", "ManufCode": 0x115f, "ManufName": "Xiaomi", "Enabled": True}, + "Lumi": {"Folder": "XIAOMI", "ManufCode": 0x1037, "ManufName": "Lumi", "Enabled": True}, + "devbis": {"Folder": "XIAOMI", "ManufCode": 0xdb15, "ManufName": "Lumi", "Enabled": True}, + "z03mmc": {"Folder": "XIAOMI", "ManufCode": 0x0084, "ManufName": "Lumi", "Enabled": True}, } @@ -1026,7 +1029,7 @@ def ota_extract_image_headers(self, subfolder, image): # OK 13/10 logging( self, "Status", - "Available Firmware - ManufCode: %4x ImageType: 0x%04x FileVersion: 0x%8x Size: %8s Bytes Filename: %s" + "Available Firmware - ManufCode: 0x%04x ImageType: 0x%04x FileVersion: 0x%08x Size: %8s Bytes Filename: %s" % (headers["manufacturer_code"], headers["image_type"], headers["image_version"], headers["size"], image), ) diff --git a/Classes/PluginConf.py b/Classes/PluginConf.py index 0503bf733..b24c4332b 100644 --- a/Classes/PluginConf.py +++ b/Classes/PluginConf.py @@ -74,7 +74,7 @@ "PluginRetrys": {"type": "bool","default": 0,"current": None,"restart": 0,"hidden": True,"Advanced": True,}, "CaptureRxFrames": {"type": "bool","default": 0,"current": None,"restart": 1,"hidden": False,"Advanced": True,}, "CaptureTxFrames": {"type": "bool","default": 0,"current": None,"restart": 1,"hidden": False,"Advanced": True,}, - "enableZclDuplicatecheck": {"type": "bool","default": 1,"current": None,"restart": 0,"hidden": False,"Advanced": True,}, + "enableZclDuplicatecheck": {"type": "bool","default": 0,"current": None,"restart": 0,"hidden": False,"Advanced": True,}, "BackupFullDevices": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": False,"ZigpyRadio": "znp" }, "ForceAPSAck": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": True, "Advanced": True, }, "BellowsNoMoreEndDeviceChildren": { "type": "bool", "default": 0, "current": None, "restart": 1, "hidden": False, "Advanced": True, "ZigpyRadio": "ezsp" }, @@ -300,11 +300,14 @@ "Temperature": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, "Thermostats": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, + "thermoSettings": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, "ThreadCommunication": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, "ThreadDomoticz": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, "ThreadForwarder": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, "ThreadWriter": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, "Timing": { "type": "bool", "default": 1, "current": None, "restart": 0, "hidden": True, "Advanced": True }, + "TimeServer": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, + "Transport": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, "Transport8000": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, "Transport8002": { "type": "bool", "default": 0, "current": None, "restart": 0, "hidden": False, "Advanced": True }, diff --git a/Classes/WebServer/rest_ZLinky.py b/Classes/WebServer/rest_ZLinky.py index 716fc880b..fc0b25c8c 100644 --- a/Classes/WebServer/rest_ZLinky.py +++ b/Classes/WebServer/rest_ZLinky.py @@ -27,39 +27,41 @@ "EASF06", "BBRHPJR", "EASF07", "EASF08", "EASF09", "EASF10", "EASD01", "EASD02", "EASD03", "EASD04", ] ZLINKY_PARAMETERS = { + # Historique 0: ( - "ADC0", "BASE", "OPTARIF", "ISOUSC", "IMAX", "PTEC", "DEMAIN", "HHPHC", "PEJP", "ADPS", + "ADC0", "BASE", "OPTARIF", "ISOUSC", "IMAX", "PTEC", "DEMAIN", "HHPHC", "PEJP", "ADPS", ), 2: ( "ADC0", "BASE", "OPTARIF", "ISOUSC", "IMAX", "IMAX1", "IMAX2", "IMAX3", "PMAX", "PTEC", "DEMAIN", "HHPHC", "PPOT", "PEJP", "ADPS", "ADIR1", "ADIR2", "ADIR3" ), + # Standard 1: ( "ADSC", "NGTF", "LTARF", "NTARF", "DATE", "EAST", "EASF01", "EASF02", "EASF03", "EASF04", "EASF05", "EASF06", "EASF07", "EASF08", "EASF09", "EASF10", "EASD01", "EASD02", "EASD03", "EASD04", "URMS1", - "PREF", "STGE", "PCOUP", + "PREF", "PCOUP", "MSG1", "MSG2", "PRM", "STGE", "DPM1", "FPM1", "DPM2", "FPM2", "DPM3", "FPM3", "RELAIS", "NJOURF", "NJOURF+1", "PJOURF+1", "PPOINTE1", ), 3: ( "ADSC", "NGTF", "LTARF", "NTARF", "DATE", "EAST", "EASF01", "EASF02", "EASF03", "EASF04", "EASF05", "EASF06", "EASF07", "EASF08", "EASF09", "EASF10", "EASD01", "EASD02", "EASD03", "EASD04", "URMS1", - "URMS2", "URMS3", "PREF", "STGE", "PCOUP", + "URMS2", "URMS3", "PREF", "PCOUP", "MSG1", "MSG2", "PRM", "STGE", "DPM1", "FPM1", "DPM2", "FPM2", "DPM3", "FPM3", "RELAIS", "NJOURF", "NJOURF+1", "PJOURF+1", "PPOINTE1", ), 5: ( "ADSC", "NGTF", "LTARF", "NTARF", "DATE", "EAST", "EASF01", "EASF02", "EASF03", "EASF04", "EASF05", "EASF06", "EASF07", "EASF08", "EASF09", "EASF10", "EASD01", "EASD02", "EASD03", "EASD04", "EAIT", "URMS1", - "PREF", "STGE", "PCOUP", "SINSTI", "SMAXIN", "SMAXIN-1", "CCAIN", "CCAIN-1", "SMAXN-1", "SMAXN2-1", "SMAXN3-1", + "PREF", "PCOUP", "SINSTI", "SMAXIN", "SMAXIN-1", "CCAIN", "CCAIN-1", "SMAXN-1", "SMAXN2-1", "SMAXN3-1", "MSG1", "MSG2", "PRM", "STGE", "DPM1", "FPM1", "DPM2", "FPM2", "DPM3", "FPM3", "RELAIS", "NJOURF", "NJOURF+1", "PJOURF+1", "PPOINTE1", ), 7: ( "ADSC", "NGTF", "LTARF", "NTARF", "DATE", "EAST", "EASF01", "EASF02", "EASF03", "EASF04", "EASF05", "EASF06", "EASF07", "EASF08", "EASF09", "EASF10", "EASD01", "EASD02", "EASD03", "EASD04", "EAIT", "URMS1", - "URMS2", "URMS3", "PREF", "STGE", "PCOUP", + "URMS2", "URMS3", "PREF", "PCOUP", "SINSTI", "SMAXIN", "SMAXIN-1", "CCAIN", "CCAIN-1", "SMAXN-1", "SMAXN2-1", "SMAXN3-1", "MSG1", "MSG2", "PRM", "STGE", "DPM1", "FPM1", "DPM2", "FPM2", "DPM3", "FPM3", "RELAIS", "NJOURF", "NJOURF+1", "PJOURF+1", "PPOINTE1", ), @@ -73,41 +75,14 @@ "BBR": ( "HHPHC", "HCHP","HCHC", "PEJP", "EJPHN", "EJPHPM",) } - -ZLINKY_STEG_ATTRIBUTS = ( - 'Contact sec ', - 'Organe de coupure ', - 'État du cache-bornes distributeur', - 'Surtension sur une des phases ', - 'Dépassement de la puissance de référence', - 'Fonctionnement producteur/consommateur', - 'Sens énergie active ', - 'Tarif en cours sur le contrat fourniture', - 'Tarif en cours sur le contrat distributeur', - 'Mode dégradée horloge', - 'État de la sortie télé-information ', - 'État de la sortie communication', - 'Statut du CPL ', - 'Synchronisation CPL ', - 'Couleur du jour', - 'Couleur du lendemain', - 'Préavis pointes mobiles ', - 'Pointe mobile ', -) def zlinky_version_infos(self, nwkid ): - - date_build = version_build = '' - # Retreive Build time - if 'SWBUILD_1' in self.ListOfDevices[ nwkid ]: - date_build = self.ListOfDevices[ nwkid ]['SWBUILD_1' ] - # Retreive Version number - if 'SWBUILD_3' in self.ListOfDevices[ nwkid ]: - version_build = self.ListOfDevices[ nwkid ]['SWBUILD_3' ] + cluster_0000 = self.ListOfDevices.get(nwkid, {}).get("Ep",{}).get("01", {}).get("0000",{}) - return date_build, version_build - + date_build = cluster_0000.get("0006","") + version_build = cluster_0000.get("4000","") - + self.logging("Debug", f"rest_zlinky - found date_build: {date_build} version_build {version_build}") + return date_build, version_build def rest_zlinky(self, verb, data, parameters): @@ -115,27 +90,28 @@ def rest_zlinky(self, verb, data, parameters): _response = prepResponseMessage(self, setupHeadersResponse()) _response["Data"] = None - self.logging("Debug", "rest_zlinky - for %s %s %s" % (verb, data, parameters)) + self.logging("Debug", "rest_zlinky - for %s %s %s" % (verb, data, parameters)) # find if we have a ZLinky zlinky = [] for nwkid in self.ListOfDevices: if 'ZLinky' not in self.ListOfDevices[ nwkid ]: continue - if "PROTOCOL Linky" not in self.ListOfDevices[ nwkid ]['ZLinky']: + zlinky_datas = self.ListOfDevices[ nwkid ].get("ZLinky") + if zlinky_datas is None: + continue + + if "PROTOCOL Linky" not in zlinky_datas: continue - if "OPTARIF" not in self.ListOfDevices[ nwkid ]['ZLinky']: + if "OPTARIF" not in zlinky_datas: continue - self.logging("Debug", "rest_zlinky - found %s " % (nwkid)) - tarif = "BASE" - for _tarif in ZLINK_TARIF_MODE_EXCLUDE: - if _tarif in self.ListOfDevices[ nwkid ]['ZLinky'][ "OPTARIF"]: - tarif = _tarif - break + self.logging("Debug", "rest_zlinky - found %s " % (nwkid)) + tarif = next( ( _tarif for _tarif in ZLINK_TARIF_MODE_EXCLUDE if _tarif in zlinky_datas["OPTARIF"] ), "BASE", ) - linky_mode = self.ListOfDevices[ nwkid ]["ZLinky"]["PROTOCOL Linky"] + linky_mode = zlinky_datas["PROTOCOL Linky"] version_info = zlinky_version_infos(self, nwkid ) + device = { 'Nwkid': nwkid, 'ZDeviceName': get_device_nickname( self, NwkId=nwkid), @@ -150,27 +126,28 @@ def rest_zlinky(self, verb, data, parameters): self.logging("Debug", "rest_zlinky - Linky DateCode %s " % version_info[0]) self.logging("Debug", "rest_zlinky - Linky Version %s " %version_info[1]) - for zlinky_param in ZLINKY_PARAMETERS[ linky_mode ]: - if zlinky_param not in self.ListOfDevices[ nwkid ]["ZLinky"]: + if zlinky_param not in zlinky_datas: self.logging("Debug", "rest_zlinky - Exclude %s " % (zlinky_param)) continue + if zlinky_param in ZLINK_TARIF_MODE_EXCLUDE[ tarif ]: self.logging("Debug", "rest_zlinky - Exclude %s " % (zlinky_param)) continue + if zlinky_param == "STGE": - #for x in self.ListOfDevices[ nwkid ]["ZLinky"][ "STGE"]: - # device["Parameters"].append( { x: self.ListOfDevices[ nwkid ]["ZLinky"]["STGE"][x] } ) + for x in zlinky_datas[ "STGE"]: + device["Parameters"].append( { f"STGE: {x}": zlinky_datas["STGE"][x] } ) continue - attr_value = self.ListOfDevices[ nwkid ]["ZLinky"][ zlinky_param ] + attr_value = zlinky_datas[ zlinky_param ] if zlinky_param in ZLINKY_INDEXES: attr_value = int(attr_value) / 1000 device["Parameters"].append( { zlinky_param: attr_value } ) - + zlinky.append( device ) - + self.logging("Debug", "rest_zlinky - Read to send %s " % (zlinky)) if verb == "GET" and len(parameters) == 0: diff --git a/DevicesModules/custom_zlinky.py b/DevicesModules/custom_zlinky.py index c14004a3a..32a60c6fe 100644 --- a/DevicesModules/custom_zlinky.py +++ b/DevicesModules/custom_zlinky.py @@ -11,11 +11,13 @@ # SPDX-License-Identifier: GPL-3.0 license import binascii +import json from Modules.domoMaj import MajDomoDevice from Modules.readAttributes import (ReadAttributeReq_Scheduled_ZLinky, ReadAttributeRequest_ff66) -from Modules.tools import checkAndStoreAttributeValue, getAttributeValue +from Modules.tools import (checkAndStoreAttributeValue, + get_device_config_param, getAttributeValue) from Modules.zlinky import (ZLINK_CONF_MODEL, ZLinky_TIC_COMMAND, convert_kva_to_ampere, decode_STEG, get_linky_mode_from_ep, get_ltarf, get_OPTARIF, @@ -53,17 +55,15 @@ def zlinky_meter_identification(self, nwkid, ep, cluster, attribut, value): checkAndStoreAttributeValue( self, nwkid, ep, cluster, attribut, value, ) if attribut == "000d": - # Looks like in standard mode PREF is in VA while in historique mode ISOUSC is in A - # Donc en mode standard ISOUSC = ( value * 1000) / 200 - if "ZLinky" in self.ListOfDevices[nwkid] and "PROTOCOL Linky" in self.ListOfDevices[nwkid]["ZLinky"]: - if self.ListOfDevices[nwkid]["ZLinky"]["PROTOCOL Linky"] in (0, 2): - # Mode Historique mono ( 0 ) - # Mode Historique tri ( 2 ) - store_ZLinky_infos( self, nwkid, 'ISOUSC', value) - else: - # Mode standard - store_ZLinky_infos( self, nwkid, 'PREF', value) - store_ZLinky_infos( self, nwkid, 'ISOUSC', convert_kva_to_ampere(value) ) + protocol_linky = self.ListOfDevices[nwkid].get( "ZLinky", {}).get( "PROTOCOL Linky") + + if protocol_linky in (0, 2): + # Mode Historique , it is given in A + store_ZLinky_infos( self, nwkid, 'ISOUSC', value) + else: + # Mode standard , needs to convert the KVA into A + store_ZLinky_infos( self, nwkid, 'PREF', value) + store_ZLinky_infos( self, nwkid, 'ISOUSC', convert_kva_to_ampere(value) ) elif attribut == "000a": store_ZLinky_infos( self, nwkid, 'VTIC', value) @@ -84,77 +84,15 @@ def zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster attribut: The attribute being processed. value: The current value of the attribute. """ - - def _normalize_tempo_color(color): - if "HP" in color: - prefix = "HP" - elif "HC" in color: - prefix = "HC" - else: - return color # Return the original color if neither "HP" nor "HC" is found - - if "ROUGE" in color: - return "R" + prefix - if "BLANC" in color: - return "W" + prefix - if "BLEU" in color: - return "B" + prefix - return color # Return the original color if no matching color is found - - - def _zlinky_update_color(nwkid, previous_color, new_color): - """Update the device color, if it has changed request a Read Attribute to get the Color""" - - if get_linky_mode_from_ep(self, nwkid) in ( 0, 2): - # Historique mode, we can rely on PTEC - ptect_value = get_ptec(self, nwkid) - self.log.logging("ZLinky", "Debug", f"_zlinky_update_color - PTEC {ptect_value}", nwkid) - - if ptect_value and ptect_value != new_color: - # Looks like the PTEC info is not aligned with the current color ! - self.log.logging("ZLinky", "Status", f"Requesting PTEC as not inline {ptect_value} to {previous_color}/{new_color}", nwkid) - ReadAttributeReq_Scheduled_ZLinky(self, nwkid) - zlinky_color_tarif(self, nwkid, new_color) - return - - # Standard mode, we rely on LTARF ( Libellé tarif fournisseur en cours) - ltarf_value = _normalize_tempo_color( get_ltarf(self, nwkid) ) - self.log.logging("ZLinky", "Debug", f"_zlinky_update_color - LTARF >{ltarf_value}<", nwkid) - - if ltarf_value and ltarf_value != new_color: - self.log.logging("ZLinky", "Status", f"Requesting LTARF (0xff66) as not inline {ltarf_value} to {previous_color}/{new_color}", nwkid) - ReadAttributeRequest_ff66(self, nwkid) - zlinky_color_tarif(self, nwkid, new_color) - - - def get_corresponding_color(attribut, op_tarifiare): - """Determine the new color based on the attribute and tariff type.""" - color_map = { - "HC..": { - "0100": "HC..", - "0102": "HP.."}, - "TEMPO": { - "0100": "BHC", - "0102": "BHP", - "0104": "WHC", - "0106": "WHP", - "0108": "RHC", - "010a": "RHP"}, - "EJP": { - "0100": "EJPHN", - "0102": "EJPHPM"} - } - self.log.logging("ZLinky", "Debug", f"get_corresponding_color: >{op_tarifiare}< >{attribut}<", nwkid) - return color_map.get(op_tarifiare, {}).get(attribut) - self.log.logging("ZLinky", "Debug", f"Cluster: {cluster}, Attribute: {attribut}, Value: {value}", nwkid) # Fetch current tariff - op_tarifiare = get_OPTARIF(self, nwkid) - self.log.logging("ZLinky", "Debug", f"OPTARIF: {op_tarifiare}", nwkid) + op_tarifaire = _normalize_tarif(self, get_OPTARIF(self, nwkid) ) + self.log.logging("ZLinky", "Debug", f"OPTARIF: {op_tarifaire}", nwkid) # Exit early for unsupported tariffs - if op_tarifiare == "BASE" or op_tarifiare not in {"TEMPO", "HC.."}: + if not _known_op_tarifaire(op_tarifaire): + self.log.logging("ZLinky", "Error", f"get_corresponding_color - unknown op_tarifaire {op_tarifaire}", nwkid) return # Get previous values @@ -170,19 +108,117 @@ def get_corresponding_color(attribut, op_tarifiare): self.log.logging("ZLinky", "Debug", f"PrevValue: {previous_value}, PrevValueAttributColor: {previous_color_value} PrevColor: {previous_color}", nwkid) # Determine the current color - new_color = get_corresponding_color(attribut, op_tarifiare) + new_color = _get_corresponding_color(self, attribut, op_tarifaire) if not new_color: return - # Handle updates for non-TEMPO tariffs - if op_tarifiare != "TEMPO": - self.log.logging("ZLinky", "Debug", f"Non-TEMPO: PrevColor: {previous_color}, NewColor: {new_color}", nwkid) - _zlinky_update_color(nwkid, previous_color, new_color) + self.log.logging("ZLinky", "Debug", f"TEMPO: PrevColor: {previous_color}, NewColor: {new_color}", nwkid) + _zlinky_update_color(self, nwkid, op_tarifaire, previous_color, new_color) + + +def _known_op_tarifaire(op_tarifaire): + """ check that is a known and valid tarif""" + # Set of valid prefixes + valid_prefixes = {"BASE", "TEMPO", "HC", "BBR", "EJP"} + + # Check if op_tarifaire matches any of the valid prefixes + return any(op_tarifaire.startswith(prefix) for prefix in valid_prefixes) + + +def _normalize_tarif(self, op_tarifaire): + """ Normalize Op Tarif """ + if op_tarifaire.startswith("BBR"): + base_tarifaire = "TEMPO" # Treat any BBRx as TEMPO + elif op_tarifaire.startswith("EJP"): + base_tarifaire = "EJP" # Treat any EJPx as EJP + else: + base_tarifaire = op_tarifaire + self.log.logging("ZLinky", "Debug", f"_normalize_tarif {op_tarifaire} -> {base_tarifaire}") + return base_tarifaire + + +def _is_tempo_tarif(self, op_tarifaire): + """ Return true if the current op tarif is Tempo """ + return _normalize_tarif(self, op_tarifaire ) == "TEMPO" + + +def _zlinky_update_color(self, nwkid, op_tarifaire, previous_color, new_color): + """Update the device color, if it has changed request a Read Attribute to get the Color""" + + if get_linky_mode_from_ep(self, nwkid) in ( 0, 2): + # Historique mode, we can rely on PTEC + ptect_value = get_ptec(self, nwkid) + if _is_tempo_tarif(self, op_tarifaire): + ptect_value = _normalize_tempo_color(self, ptect_value) + + self.log.logging("ZLinky", "Debug", f"_zlinky_update_color - PTEC {ptect_value}", nwkid) + + if ptect_value and ptect_value != new_color: + # Looks like the PTEC info is not aligned with the current color ! + self.log.logging("ZLinky", "Status", f"Requesting PTEC as not inline op_tarifaire: {op_tarifaire} ptec: {ptect_value} to prev_volor: {previous_color} new_color: {new_color}", nwkid) + ReadAttributeReq_Scheduled_ZLinky(self, nwkid) + zlinky_color_tarif(self, nwkid, new_color) return - # Handle updates for TEMPO-specific tariffs - self.log.logging("ZLinky", "Debug", f"TEMPO: PrevColor: {previous_color}, NewColor: {new_color}", nwkid) - _zlinky_update_color(nwkid, previous_color, new_color) + # Standard mode, we rely on LTARF ( Libellé tarif fournisseur en cours) + ltarf_value = get_ltarf(self, nwkid) + if _is_tempo_tarif(self, op_tarifaire): + ltarf_value = _normalize_tempo_color(self, ltarf_value) + self.log.logging("ZLinky", "Debug", f"_zlinky_update_color - LTARF >{ltarf_value}<", nwkid) + + if ltarf_value and ltarf_value != new_color: + self.log.logging("ZLinky", "Status", f"Requesting LTARF (0xff66) as not inline {ltarf_value} to {previous_color}/{new_color}", nwkid) + ReadAttributeRequest_ff66(self, nwkid) + zlinky_color_tarif(self, nwkid, new_color) + + +def _get_corresponding_color(self, attribut, op_tarifaire): + """Determine the new color based on the attribute and tariff type, with support for extended prefixes.""" + # Determine the base tariff type (handle variations like BBRx, EJPx) + base_tarifaire = _normalize_tarif(self, op_tarifaire) + + # Define the color map for tariff types + color_map = { + "HC..": { "0100": "HC..", "0102": "HP.."}, + "TEMPO": { "0100": "BHC", "0102": "BHP", "0104": "WHC", "0106": "WHP", "0108": "RHC", "010a": "RHP"}, + "EJP": { "0100": "EJPHN", "0102": "EJPHPM"} + } + + # Log the parameters for debugging + self.log.logging("ZLinky", "Debug", f"get_corresponding_color: >{op_tarifaire}/{base_tarifaire}< >{attribut}<") + + # Return the corresponding color based on the base tariff type and attribute + return color_map.get(base_tarifaire, {}).get(attribut) + + +def _normalize_tempo_color(self, color): + """Normalize the given color to the Tempo format.""" + self.log.logging("ZLinky", "Debug", f"_normalize_tempo_color - {color}") + + if "HP" in color: + suffix = "HP" + elif "HC" in color: + suffix = "HC" + else: + return color # No "HP" or "HC" found, return as is + + color_map = { + "ROUGE": "R", + "R": "R", + "BLANC": "W", + "W": "W", + "BLEU": "B", + "B": "B", + } + + for key, value in color_map.items(): + if key in color: + tempo_color = f"{value}{suffix}" + self.log.logging("ZLinky", "Debug", f"_normalize_tempo_color - {color} => {tempo_color}") + return f"{tempo_color}" + + self.log.logging("ZLinky", "Debug", f"_normalize_tempo_color - no color found {color} - {suffix}") + return color # No matching color found, return original def zlinky_cluster_metering(self, domoticz_devices, nwkid, ep, cluster, attribut, value): @@ -197,17 +233,19 @@ def zlinky_cluster_metering(self, domoticz_devices, nwkid, ep, cluster, attribut attribut: The attribute being processed. value: The current value of the attribute. """ - def handle_attribut_value(attribut, store_keys=None, update_color=False, totalize=False, maj_ep=None): + def _handle_attribut_value(attribut, store_keys=None, update_color=False, totalize=False, maj_ep=None): """Helper function to handle attribute values.""" + if not value: return - self.log.logging("ZLinky", "Debug", f"Cluster0702 - {attribut} ZLinky_TIC Value: {value}", nwkid) + self.log.logging("ZLinky", "Debug", f"zlinky_cluster_metering - {attribut} ZLinky_TIC Value: {value}", nwkid) maj_ep = maj_ep or ep - MajDomoDevice(self, domoticz_devices, nwkid, maj_ep, cluster, str(value), Attribute_=attribut) if attribut == "0020": MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", value, Attribute_="0020") zlinky_color_tarif(self, nwkid, str(value)) + else: + MajDomoDevice(self, domoticz_devices, nwkid, maj_ep, cluster, str(value), Attribute_=attribut) if update_color: zlinky_set_color_based_on_counter(self, domoticz_devices, nwkid, ep, cluster, attribut, value) @@ -229,21 +267,21 @@ def handle_attribut_value(attribut, store_keys=None, update_color=False, totaliz # Define attribute handlers attribute_handlers = { - "0000": lambda: handle_attribut_value("0000", ["BASE", "EAST"]), - "0001": lambda: handle_attribut_value("0001", ["EAIT"]), - "0020": lambda: handle_attribut_value("0020", ["PTEC"]), - "0100": lambda: handle_attribut_value("0100", ["EASF01", "HCHC", "EJPHN", "BBRHCJB"], update_color=True, totalize=True), - "0102": lambda: handle_attribut_value("0102", ["EASF02", "HCHP", "EJPHPM", "BBRHCJW"], update_color=True, totalize=True), - "0104": lambda: handle_attribut_value("0104", ["EASF03", "BBRHCJW"], update_color=True, totalize=True, maj_ep="f2"), - "0106": lambda: handle_attribut_value("0106", ["EASF04", "BBRHPJW"], update_color=True, totalize=True, maj_ep="f2"), - "0108": lambda: handle_attribut_value("0108", ["EASF05", "BBRHCJR"], update_color=True, totalize=True, maj_ep="f3"), - "010a": lambda: handle_attribut_value("010a", ["EASF06", "BBRHPJR"], update_color=True, totalize=True, maj_ep="f3"), - "010c": lambda: handle_attribut_value("010c", ["EASF07"]), - "010e": lambda: handle_attribut_value("010e", ["EASF08"]), - "0110": lambda: handle_attribut_value("0110", ["EASF09"]), - "0112": lambda: handle_attribut_value("0112", ["EASF10"]), + "0000": lambda: _handle_attribut_value("0000", ["BASE", "EAST"]), + "0001": lambda: _handle_attribut_value("0001", ["EAIT"]), + "0020": lambda: _handle_attribut_value("0020", ["PTEC"]), + "0100": lambda: _handle_attribut_value("0100", ["EASF01", "HCHC", "EJPHN", "BBRHCJB"], update_color=True, totalize=True), + "0102": lambda: _handle_attribut_value("0102", ["EASF02", "HCHP", "EJPHPM", "BBRHCJW"], update_color=True, totalize=True), + "0104": lambda: _handle_attribut_value("0104", ["EASF03", "BBRHCJW"], update_color=True, totalize=True, maj_ep="f2"), + "0106": lambda: _handle_attribut_value("0106", ["EASF04", "BBRHPJW"], update_color=True, totalize=True, maj_ep="f2"), + "0108": lambda: _handle_attribut_value("0108", ["EASF05", "BBRHCJR"], update_color=True, totalize=True, maj_ep="f3"), + "010a": lambda: _handle_attribut_value("010a", ["EASF06", "BBRHPJR"], update_color=True, totalize=True, maj_ep="f3"), + "010c": lambda: _handle_attribut_value("010c", ["EASF07"]), + "010e": lambda: _handle_attribut_value("010e", ["EASF08"]), + "0110": lambda: _handle_attribut_value("0110", ["EASF09"]), + "0112": lambda: _handle_attribut_value("0112", ["EASF10"]), "0307": lambda: store_ZLinky_infos(self, nwkid, "PRM", value), - "0308": lambda: handle_attribut_value("0308", ["ADC0", "ADSC"]), + "0308": lambda: _handle_attribut_value("0308", ["ADC0", "ADSC"]), } # Process attribute using handler @@ -302,8 +340,6 @@ def zlinky_cluster_electrical_measurement(self, domoticz_devices, nwkid, ep, clu self.log.logging( "ZLinky", "Debug", "zlinky_cluster_electrical_measurement %s - %s/%s %s Current L1 %s" % ( cluster, nwkid, ep, attribut, value), nwkid, ) - # from random import randrange - # value = randrange( 0x0, 0x3c) if value == 0xFFFF: return @@ -495,14 +531,9 @@ def zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, att elif attribut == "0001": # Histo : DEMAIN value = ''.join(map(lambda x: x if ord(x) in range(128) else ' ', value)) - tarif = None - if ( - "ff66" in self.ListOfDevices[nwkid]["Ep"][ep] - and "0000" in self.ListOfDevices[nwkid]["Ep"][ep]["ff66"] - and self.ListOfDevices[nwkid]["Ep"][ep]["ff66"]["0000"] - not in ("", {}) - ): - tarif = self.ListOfDevices[nwkid]["Ep"][ep]["ff66"]["0000"] + + # Extract tarif if conditions are met + tarif = self.ListOfDevices[nwkid]["Ep"][ep].get("ff66", {}).get("0000") if tarif and "BBR" not in tarif: return @@ -510,13 +541,13 @@ def zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, att self.log.logging([ "ZLinky", "Cluster"], "Debug", f"zlinky_cluster_lixee_private ({attribut}) DEMAIN {value}", nwkid) if value == "BLAN": - MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "20|Tomorrow WHITE day", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "2|Tomorrow WHITE day", Attribute_="0001") elif value == "BLEU": - MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "10|Tomorrow BLUE day", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "1|Tomorrow BLUE day", Attribute_="0001") elif value == "ROUG": - MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "40|Tomorrow RED day", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "4|Tomorrow RED day", Attribute_="0001") else: - MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "00|No information", Attribute_="0001") + MajDomoDevice(self, domoticz_devices, nwkid, ep, "0009", "0|No information", Attribute_="0001") checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, value) @@ -590,7 +621,13 @@ def zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, att tarif_mapping = { "BLEU": "B", "BLAN": "W", - "ROUG": "R" + "ROUG": "R", + "HP BLEU": "B", + "HC BLEU": "B", + "HP BLANC": "W", + "HC BLANC": "W", + "HP ROUGE": "R", + "HC ROUGE": "R", } s_tarif = next((tarif_mapping[key] for key in tarif_mapping if key in value), "") @@ -686,6 +723,8 @@ def zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, att self.log.logging( "ZLinky", "Log", "STGE decoded %s : %s" % ( stge, decode_STEG( stge ) )) store_ZLinky_infos( self, nwkid, "STGE", decode_STEG( stge )) checkAndStoreAttributeValue(self, nwkid, ep, cluster, attribut, stge) + + process_tomorrow_color(self, domoticz_devices, nwkid) elif attribut in ( "0218", ): # Standard : DPM1 @@ -744,4 +783,40 @@ def zlinky_cluster_lixee_private(self, domoticz_devices, nwkid, ep, cluster, att elif attribut == "0300": # Linky Mode - update_zlinky_device_model_if_needed( self, nwkid ) \ No newline at end of file + update_zlinky_device_model_if_needed( self, nwkid ) + + +def process_tomorrow_color(self, domoticz_devices, nwkid): + + self.log.logging( "ZLinky", "Log", f"process_tomorrow_color {nwkid}") + + # We have receive a STGE info, let's check if the Color has been updated, in that case troger an update + zlinky_infosstge_infos = self.ListOfDevices[ nwkid].get("ZLinky",{}).get("STGE") + + if zlinky_infosstge_infos is None: + return + + preavis_pointe_mobile = zlinky_infosstge_infos.get("preavis_point_mobile") + pointe_mobile = zlinky_infosstge_infos.get("pointe_mobile") + + couleur_du_jour = zlinky_infosstge_infos.get("couleur_jour") + couleur_lendemain = zlinky_infosstge_infos.get("couleur_demain") + + self.log.logging( "ZLinky", "Log", f"process_tomorrow_color {nwkid} Jour: {couleur_du_jour} Demain: {couleur_lendemain}") + self.log.logging( "ZLinky", "Log", f"process_tomorrow_color {nwkid} Preavis: {preavis_pointe_mobile} Pointe: {pointe_mobile}") + + if couleur_lendemain is None: + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", "00|No information", Attribute_="0001") + return + + if couleur_lendemain in ( "Pas d'annonce", "Bleu", "Blanc", "Rouge"): + if couleur_lendemain == "Bleu": + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", "1|Tomorrow BLUE day", Attribute_="0001") + elif couleur_lendemain == "Blanc": + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", "2|Tomorrow WHITE day", Attribute_="0001") + elif couleur_lendemain == "Rouge": + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", "4|Tomorrow RED day", Attribute_="0001") + else: + MajDomoDevice(self, domoticz_devices, nwkid, "01", "0009", "00|No Information yet", Attribute_="0001") + return + diff --git a/Modules/domoMaj.py b/Modules/domoMaj.py index db499ae2d..da42474ea 100644 --- a/Modules/domoMaj.py +++ b/Modules/domoMaj.py @@ -36,7 +36,8 @@ zigpy_plugin_sanity_check) from Modules.zigateConsts import THERMOSTAT_MODE_2_LEVEL from Modules.zlinky import (ZLINK_CONF_MODEL, get_instant_power, - get_tarif_color, zlinky_sum_all_indexes) + get_notification_day_color, get_tarif_color, + zlinky_sum_all_indexes) from Zigbee.zdpCommands import zdp_IEEE_address_request WIDGET_TO_BYPASS_EP_MATCH = ("XCube", "Aqara", "DSwitch", "DButton", "DButton_3") @@ -131,8 +132,7 @@ def _domo_maj_one_cluster_type_entry( self, Devices, NwkId, Ep, device_id_ieee, ) return - value, text = tuple_value - nValue = int(value) + nValue, text = int(tuple_value[0]), tuple_value[1] update_domoticz_widget(self, Devices, device_id_ieee, device_unit, nValue, text, BatteryLevel, SignalLevel) if WidgetType == "BatteryPercentage" and ClusterType == "Voltage" and Attribute_ == "0021": @@ -142,71 +142,9 @@ def _domo_maj_one_cluster_type_entry( self, Devices, NwkId, Ep, device_id_ieee, if ClusterType == "Alarm" and WidgetType == "Alarm_ZL3" and Attribute_ == "0020": if value is None or len(value) == 0: return - # Notification Day Color and Peak - if value == "TH..": - # Toutes Heures - nValue = 0 - sValue = "All Hours" - - elif value == "HC..": - # Heures Creuses - nValue = 1 - sValue = "Off-peak Hours" - - elif value == "HP..": - # Heures Pleines - nValue = 2 - sValue = "Peak Hours" - - elif value == "HN..": - # Heures Normales - nValue = 1 - sValue = "Normal Hours" + nValue, sValue = get_notification_day_color( value ) + self.log.logging( "Widget", "Log", f"ZLinky Color of the Day {value} => {nValue},{sValue}", NwkId, ) - elif value == "PM..": - # Pointe Mobile - nValue = 4 - sValue = "Mobile peak Hours" - - # Standard Tempo - elif value == "BHC": - nValue = 1 - sValue = "Bleu HC" - elif value == "BHP": - nValue = 1 - sValue = "Bleu HP" - - elif value == "WHC": - nValue = 2 - sValue = "Blanc HC" - elif value == "WHP": - nValue = 2 - sValue = "Blanc HP" - - elif value == "RHC": - nValue = 4 - sValue = "Rouge HC" - elif value == "RHP": - nValue = 4 - sValue = "Rouge HP" - - elif value[0] == "B": - # Blue - nValue = 1 - sValue = "Blue Hours" - elif value[0] == "W": - # Whte - nValue = 2 - sValue = "White Hours" - elif value[0] == "R": - # Red - nValue = 4 - sValue = "RED Hours" - - else: - # Unknow - nValue = 3 - sValue = "Unknown" update_domoticz_widget(self, Devices, device_id_ieee, device_unit, nValue, sValue, BatteryLevel, SignalLevel) if "Ampere" in ClusterType and WidgetType == "Ampere" and Attribute_ == "0508": diff --git a/Modules/heartbeat.py b/Modules/heartbeat.py index 9a5d10a77..054a4dedc 100755 --- a/Modules/heartbeat.py +++ b/Modules/heartbeat.py @@ -89,65 +89,6 @@ FIRST_PING_VIA_GROUP = 127 // HEARTBEAT -#def attributeDiscovery(self, NwkId): -# -# rescheduleAction = False -# # If Attributes not yet discovered, let's do it -# -# if "ConfigSource" not in self.ListOfDevices[NwkId]: -# return False -# -# if self.ListOfDevices[NwkId]["ConfigSource"] == "DeviceConf": -# return False -# -# if "Attributes List" in self.ListOfDevices[NwkId] and len(self.ListOfDevices[NwkId]["Attributes List"]) > 0: -# return False -# -# if "Attributes List" not in self.ListOfDevices[NwkId]: -# self.ListOfDevices[NwkId]["Attributes List"] = {'Ep': {}} -# if "Request" not in self.ListOfDevices[NwkId]["Attributes List"]: -# self.ListOfDevices[NwkId]["Attributes List"]["Request"] = {} -# -# for iterEp in list(self.ListOfDevices[NwkId]["Ep"]): -# if iterEp == "ClusterType": -# continue -# if iterEp not in self.ListOfDevices[NwkId]["Attributes List"]["Request"]: -# self.ListOfDevices[NwkId]["Attributes List"]["Request"][iterEp] = {} -# -# for iterCluster in list(self.ListOfDevices[NwkId]["Ep"][iterEp]): -# if iterCluster in ("Type", "ClusterType", "ColorMode"): -# continue -# if iterCluster not in self.ListOfDevices[NwkId]["Attributes List"]["Request"][iterEp]: -# self.ListOfDevices[NwkId]["Attributes List"]["Request"][iterEp][iterCluster] = 0 -# -# if self.ListOfDevices[NwkId]["Attributes List"]["Request"][iterEp][iterCluster] != 0: -# continue -# -# if not self.busy and self.ControllerLink.loadTransmit() <= MAX_LOAD_ZIGATE: -# if int(iterCluster, 16) < 0x0FFF: -# getListofAttribute(self, NwkId, iterEp, iterCluster) -# # getListofAttributeExtendedInfos(self, NwkId, EpOut, cluster, start_attribute=None, manuf_specific=None, manuf_code=None) -# elif ( -# "Manufacturer" in self.ListOfDevices[NwkId] -# and len(self.ListOfDevices[NwkId]["Manufacturer"]) == 4 -# and is_hex(self.ListOfDevices[NwkId]["Manufacturer"]) -# ): -# getListofAttribute( -# self, -# NwkId, -# iterEp, -# iterCluster, -# manuf_specific="01", -# manuf_code=self.ListOfDevices[NwkId]["Manufacturer"], -# ) -# # getListofAttributeExtendedInfos(self, NwkId, EpOut, cluster, start_attribute=None, manuf_specific=None, manuf_code=None) -# -# self.ListOfDevices[NwkId]["Attributes List"]["Request"][iterEp][iterCluster] = time.time() -# -# else: -# rescheduleAction = True -# -# return rescheduleAction def attributeDiscovery(self, NwkId): # If Attributes not yet discovered, let's do it @@ -341,51 +282,77 @@ def pollingManufSpecificDevices(self, NwkId, HB): "TempPollingFreq": ReadAttributeRequest_0402, "HumiPollingFreq": ReadAttributeRequest_0405, "BattPollingFreq": ReadAttributeRequest_0001, - "ZLinkyIndexes": ReadAttributeReq_Scheduled_ZLinky, # Based on a specific time - "ZLinkyPollingPTEC": ReadAttributeReq_Scheduled_ZLinky, # Every 15' by default - "ZLinkyPolling0702": ReadAttributeRequest_0702_ZLinky_TIC, - "ZLinkyPollingGlobal": ReadAttributeReq_ZLinky, - "PollingCusterff66": ReadAttributeRequest_ff66, + "ZLinkyPollingPTEC": ReadAttributeReq_Scheduled_ZLinky, # Color of day and next day + "ZLinkyPolling0702": ReadAttributeRequest_0702_ZLinky_TIC, # Metering + "ZLinkyPollingGlobal": ReadAttributeReq_ZLinky, # All ZLinky Clusters/Attributes + "PollingCusterff66": ReadAttributeRequest_ff66, # All Manufacturer Specific ZLinky attributes "InletTempPolling": ReadAttributeRequest_0702_0017, # Retreive Inlet Temperature } - if "Param" not in self.ListOfDevices[NwkId]: + def _scheduled_zlinky_read(self, NwkId, parameter, device_parameters, heartbeat_counter): + """Handles scheduled ZLinky read operations based on time or heartbeat intervals.""" + + _current_time = datetime.datetime.now().strftime("%H:%M") + _target_value = device_parameters.get(parameter) + + # Determine execution condition + should_execute = False + if isinstance( _target_value, str) and ":" in _target_value: + should_execute = (_current_time == _target_value) + + elif isinstance( _target_value, (int, float)): + _target_value = _target_value // HEARTBEAT + if _target_value != 0: + should_execute = (heartbeat_counter % _target_value) == 0 + + self.log.logging( + ["Heartbeat", "ZLinky"], "Debug", + f"++ pollingManufSpecificDevices - {NwkId} {parameter}: Current: {_current_time} Target: {_target_value} should_execute {should_execute}", + NwkId, + ) + + if should_execute: + if "ScheduledZLinkyRead" in self.ListOfDevices[NwkId]: + return + if parameter == "ZLinkyPollingPTEC": + self.log.logging("Heartbeat", "Status", "Reading ZLinky Color of Day and Next Day") + + self.ListOfDevices[NwkId]["ScheduledZLinkyRead"] = True + func = FUNC_MANUF[param] + func(self, NwkId) + + elif "ScheduledZLinkyRead" in self.ListOfDevices[NwkId]: + # Prevent multiple executions within the same time unit + self.ListOfDevices[NwkId].pop("ScheduledZLinkyRead", None) + + + device_parameters = self.ListOfDevices[NwkId].get("Param") + if device_parameters is None: return False if self.busy or self.ControllerLink.loadTransmit() > MAX_LOAD_ZIGATE: return True - if "LastPollingManufSpecificDevices" in self.ListOfDevices[ NwkId ] and self.ListOfDevices[ NwkId ][ "LastPollingManufSpecificDevices"] == HB: + last_polling = self.ListOfDevices[ NwkId ].get("LastPollingManufSpecificDevices") + if last_polling and last_polling == HB: return False self.log.logging( "Heartbeat", "Debug", "++ pollingManufSpecificDevices - %s " % (NwkId,), NwkId, ) - for param in self.ListOfDevices[NwkId]["Param"]: - if param == "ZLinkyPollingPTEC": - # We are requesting to execute at a particular time - _current_time = datetime.datetime.now().strftime("%H:%M" ) - _target_time = self.ListOfDevices[NwkId]["Param"][ param ] - self.log.logging( "Heartbeat", "Debug", "++ pollingManufSpecificDevices - %s ScheduledZLinkyRead: Current: %s Target: %s" % ( - NwkId,_current_time, _target_time ), NwkId, ) - - if _current_time == _target_time and "ScheduledZLinkyRead" not in self.ListOfDevices[ NwkId ]: - self.ListOfDevices[ NwkId ][ "ScheduledZLinkyRead" ] = True - ReadAttributeReq_Scheduled_ZLinky( self, NwkId) - ReadAttributeRequest_ff66( self, NwkId) - - elif _current_time != _target_time and "ScheduledZLinkyRead" in self.ListOfDevices[ NwkId ]: - del self.ListOfDevices[ NwkId ][ "ScheduledZLinkyRead" ] + for param in device_parameters: + if param in ("ZLinkyPollingPTEC", "ScheduledZLinkyRead", "ZLinkyPolling0702", "ZLinkyPollingGlobal", "PollingCusterff66"): + _scheduled_zlinky_read(self, NwkId, param, device_parameters, HB) elif param in FUNC_MANUF: - _FEQ = self.ListOfDevices[NwkId]["Param"][param] // HEARTBEAT + _FEQ = device_parameters[param] // HEARTBEAT if _FEQ == 0: # Disable continue self.log.logging( "Heartbeat", "Debug", "++ pollingManufSpecificDevices - %s Found: %s=%s HB: %s FEQ: %s Cycle: %s" % ( - NwkId, param, self.ListOfDevices[NwkId]["Param"][param], HB, _FEQ, (HB % _FEQ)), NwkId, ) + NwkId, param, device_parameters[param], HB, _FEQ, (HB % _FEQ)), NwkId, ) if _FEQ and ((HB % _FEQ) != 0): continue self.log.logging( "Heartbeat", "Debug", "++ pollingManufSpecificDevices - %s Found: %s=%s" % ( - NwkId, param, self.ListOfDevices[NwkId]["Param"][param]), NwkId, ) + NwkId, param, device_parameters[param]), NwkId, ) func = FUNC_MANUF[param] func(self, NwkId) @@ -393,6 +360,8 @@ def pollingManufSpecificDevices(self, NwkId, HB): return False + + def pollingDeviceStatus(self, NwkId): # """ # Purpose is to trigger ReadAttrbute 0x0006 and 0x0008 on attribute 0x0000 if applicable diff --git a/Modules/paramDevice.py b/Modules/paramDevice.py index e2a212ea2..47d548098 100644 --- a/Modules/paramDevice.py +++ b/Modules/paramDevice.py @@ -22,58 +22,69 @@ from Modules.onoff_settings import ONOFF_DEVICE_PARAMETERS from Modules.philips import PHILIPS_DEVICE_PARAMETERS from Modules.schneider_wiser import SCHNEIDER_DEVICE_PARAMETERS +from Modules.thermo_settings import THERMOSTAT_DEVICE_PARAMETERS from Modules.tuya import TUYA_DEVICE_PARAMETERS from Modules.tuyaSiren import TUYA_SIREN_DEVICE_PARAMETERS from Modules.tuyaTRV import TUYA_TRV_DEVICE_PARAMETERS from Modules.tuyaTS011F import TUYA_TS011F_DEVICE_PARAMETERS from Modules.tuyaTS0601 import ts0601_extract_data_point_infos, ts0601_settings + def initialize_device_settings(self): + """Initializes device settings by loading general and manufacturer-specific parameters.""" self.device_settings = {} - - # Load specific settings - self.device_settings.update(ONOFF_DEVICE_PARAMETERS) - self.device_settings.update(OCCUPANCY_DEVICE_PARAMETERS) - self.device_settings.update(IAS_DEVICE_PARAMETERS) - self.device_settings.update(BALLAST_DEVICE_PARAMETERS) - - # Load Manufacturer specific settings - self.device_settings.update(DANFOSS_DEVICE_PARAMETERS) - - self.device_settings.update(LEGRAND_DEVICE_PARAMETERS) - - self.device_settings.update(LUMI_DEVICE_PARAMETERS) - - self.device_settings.update(PHILIPS_DEVICE_PARAMETERS) - self.device_settings.update(SONOFF_DEVICE_PARAMETERS) + # General device parameters + general_parameters = [ + ONOFF_DEVICE_PARAMETERS, + OCCUPANCY_DEVICE_PARAMETERS, + IAS_DEVICE_PARAMETERS, + BALLAST_DEVICE_PARAMETERS, + THERMOSTAT_DEVICE_PARAMETERS, + ] + + # Manufacturer-specific device parameters + manufacturer_parameters = [ + DANFOSS_DEVICE_PARAMETERS, + LEGRAND_DEVICE_PARAMETERS, + LUMI_DEVICE_PARAMETERS, + PHILIPS_DEVICE_PARAMETERS, + SONOFF_DEVICE_PARAMETERS, + SUNRICHER_DEVICE_PARAMETERS, + TUYA_DEVICE_PARAMETERS, + TUYA_TS011F_DEVICE_PARAMETERS, + TUYA_TRV_DEVICE_PARAMETERS, + TUYA_SIREN_DEVICE_PARAMETERS, + SCHNEIDER_DEVICE_PARAMETERS, + ] + + # Update device settings in a single loop + for param_group in general_parameters + manufacturer_parameters: + self.device_settings.update(param_group) - self.device_settings.update(SUNRICHER_DEVICE_PARAMETERS) - - self.device_settings.update(TUYA_DEVICE_PARAMETERS) - self.device_settings.update(TUYA_TS011F_DEVICE_PARAMETERS) - self.device_settings.update(TUYA_TRV_DEVICE_PARAMETERS) - self.device_settings.update(TUYA_SIREN_DEVICE_PARAMETERS) - - self.device_settings.update(SCHNEIDER_DEVICE_PARAMETERS) def sanity_check_of_param(self, NwkId): + """Performs a sanity check on device parameters and applies relevant settings.""" + + self.log.logging("Heartbeat", "Debug", f"sanity_check_of_param {NwkId}") - self.log.logging("Heartbeat", "Debug", f"sanity_check_of_param {NwkId}") + device_data = self.ListOfDevices.get(NwkId, {}) + param_data = device_data.get("Param", {}) + model_name = device_data.get("Model", "") - param_data = self.ListOfDevices.get(NwkId, {}).get("Param", {}) - model_name = self.ListOfDevices.get(NwkId, {}).get("Model", "") + dps_mapping = ts0601_extract_data_point_infos(self, model_name) for param, value in param_data.items(): - self.log.logging("Heartbeat", "Debug", f"sanity_check_of_param {param}, {value}") - - dps_mapping = ts0601_extract_data_point_infos( self, model_name) + self.log.logging("Heartbeat", "Debug", f"Checking param: {param}, Value: {value}") + if dps_mapping: - ts0601_settings( self, NwkId, dps_mapping, param, value) + ts0601_settings(self, NwkId, dps_mapping, param, value) + continue + + param_setting = self.device_settings.get(param) - elif param in self.device_settings: - if callable( self.device_settings[param] ): - self.device_settings[param](self, NwkId, value) + if callable(param_setting): + param_setting(self, NwkId, value) - elif "callable" in self.device_settings[param]: - self.device_settings[param]["callable"](self, NwkId, value) + elif isinstance(param_setting, dict) and "callable" in param_setting and callable(param_setting["callable"]): + param_setting["callable"](self, NwkId, value) diff --git a/Modules/readAttributes.py b/Modules/readAttributes.py index 68916fa26..fb2ad0a44 100644 --- a/Modules/readAttributes.py +++ b/Modules/readAttributes.py @@ -121,6 +121,18 @@ def ReadAttributeReq( self, addr, EpIn, EpOut, Cluster, ListOfAttributes, manufa normalizedReadAttributeReq(self, addr, EpIn, EpOut, Cluster, shortlist, manufacturer_spec, manufacturer, ackIsDisabled) +def read_manufacturer_specific_attributes(self, nwkid, ep_out, cluster): + """ Request a Read Attributes of Manufacturer specific attributes defined in Config file for this cluster""" + manufacturer_code, manufacturer_attributes = retreive_manufacturer_specifics_attributes(self, nwkid, cluster) + if manufacturer_code and manufacturer_attributes: + # Log the message + attributes_str = " ".join(f"0x{num:04x}" for num in manufacturer_attributes) + self.log.logging("ReadAttributes", "Debug", f"Request Manuf.Specific Attributes for cluster {cluster} for {nwkid} {ep_out} {attributes_str}", nwkid=nwkid) + + # Perform the Request + ReadAttributeReq( self, nwkid, ZIGATE_EP, ep_out, "0201", manufacturer_attributes, manufacturer_spec="01", manufacturer=manufacturer_code, ackIsDisabled=is_ack_tobe_disabled(self, nwkid), checkTime=False, ) + + def split_list(list_in, wanted_parts=1): """ Split the list of attrributes in wanted part @@ -227,22 +239,42 @@ def retreive_ListOfAttributesByCluster(self, key, Ep, cluster): def retreive_attributes_based_on_configuration(self, key, cluster): - if "Model" not in self.ListOfDevices[key]: + model_name = self.ListOfDevices[key].get("Model") + if model_name is None: return None - if self.ListOfDevices[key]["Model"] not in self.DeviceConf: + if model_name not in self.DeviceConf: return None - if "ReadAttributes" not in self.DeviceConf[self.ListOfDevices[key]["Model"]]: + if "ReadAttributes" not in self.DeviceConf[model_name]: return None - if cluster not in self.DeviceConf[self.ListOfDevices[key]["Model"]]["ReadAttributes"]: + if cluster not in self.DeviceConf[model_name]["ReadAttributes"]: return None return [ int(attr, 16) - for attr in self.DeviceConf[self.ListOfDevices[key]["Model"]][ + for attr in self.DeviceConf[model_name][ "ReadAttributes" ][cluster] ] +def retreive_manufacturer_specifics_attributes(self, nwkid, cluster): + model_name = self.ListOfDevices[nwkid].get("Model") + if model_name is None: + return None, None + + if model_name not in self.DeviceConf: + return None, None + + manufacturer_clusters = self.DeviceConf[model_name].get("ManufacturerAttributes") + manufacturer_code = self.DeviceConf[model_name].get("ManufacturerCode") + + if manufacturer_clusters is None or manufacturer_code is None: + return None, None + + if cluster not in manufacturer_clusters: + return None, None + + return manufacturer_code, [ int(attr, 16) for attr in manufacturer_clusters[ cluster ] ] + def retreive_attributes_from_default_device_list(self, key, Ep, cluster): @@ -823,127 +855,80 @@ def ReadAttributeRequest_0201(self, key): # Thermostat self.log.logging("ReadAttributes", "Debug", "ReadAttributeRequest_0201 - Key: %s " % key, nwkid=key) - _model = "Model" in self.ListOfDevices[key] - disableAck = True - if "PowerSource" in self.ListOfDevices[key] and self.ListOfDevices[key]["PowerSource"] == "Battery": - disableAck = False - - ListOfEp = getListOfEpForCluster(self, key, "0201") - for EPout in ListOfEp: - listAttributes = [] - for iterAttr in retreive_ListOfAttributesByCluster(self, key, EPout, "0201"): - if iterAttr not in listAttributes: - listAttributes.append(iterAttr) - - if _model and str(self.ListOfDevices[key]["Model"]).find("Super TR") == 0: - self.log.logging("ReadAttributes", "Debug", "- req Attributes for Super TR", nwkid=key) - listAttributes.append(0x0403) - listAttributes.append(0x0405) - listAttributes.append(0x0406) - listAttributes.append(0x0408) - listAttributes.append(0x0409) + _model_name = self.ListOfDevices[key].get("Model","") + manufacturer = self.ListOfDevices[key].get("Manufacturer") + manufacturer_name = self.ListOfDevices[key].get("Manufacturer Name") + + eps_list = getListOfEpForCluster(self, key, "0201") + for ep_out in eps_list: + attribute_list = [] + for attr in retreive_ListOfAttributesByCluster(self, key, ep_out, "0201"): + if attr not in attribute_list: + attribute_list.append(attr) + + if _model_name.find("Super TR") == 0: + self.log.logging("ReadAttributes", "Debug", "- req Attributes for Super TR", nwkid=key) + attribute_list.append(0x0403) + attribute_list.append(0x0405) + attribute_list.append(0x0406) + attribute_list.append(0x0408) + attribute_list.append(0x0409) # Adjustement before request - listAttrSpecific = [] - listAttrGeneric = [] + attr_spec_list = [] + appt_generic_list = [] manufacturer_code = "0000" - if ( - ("Manufacturer" in self.ListOfDevices[key] and self.ListOfDevices[key]["Manufacturer"] == "105e") - or ( - ("Manufacturer" in self.ListOfDevices[key] and self.ListOfDevices[key]["Manufacturer"] == "113c") - or ( "Manufacturer Name" in self.ListOfDevices[key] and self.ListOfDevices[key]["Manufacturer Name"] == "Schneider Electric") - ) - ): - # We need to break the Read Attribute between Manufacturer specifcs one and teh generic one - if self.ListOfDevices[key]["Manufacturer Name"] == "Schneider Electric": + if manufacturer in ("105e","113c") or manufacturer_name in ("Schneider Electric", "OWON", "CASAIA"): + # We need to break the Read Attribute between Manufacturer specifcs one and the generic one + if manufacturer_name == "Schneider Electric": manufacturer_code = "105e" - elif self.ListOfDevices[key]["Manufacturer Name"] in ("OWON", "CASAIA"): + elif manufacturer_name in ("OWON", "CASAIA"): manufacturer_code = "113c" - for _attr in list(listAttributes): + for _attr in list(attribute_list): if _attr in (0xE011, 0x0E20, 0xFD00): - listAttrSpecific.append(_attr) + appt_generic_list.append(_attr) else: - listAttrGeneric.append(_attr) - del listAttributes - listAttributes = listAttrGeneric + appt_generic_list.append(_attr) + del attribute_list + attribute_list = appt_generic_list - if ("Manufacturer" in self.ListOfDevices[key] and self.ListOfDevices[key]["Manufacturer"] == "1246") or ( - "Manufacturer Name" in self.ListOfDevices[key] and self.ListOfDevices[key]["Manufacturer Name"] == "Danfoss" - ): + elif manufacturer == "1246" or manufacturer_name == "Danfoss": manufacturer_code = "1246" - for _attr in list(listAttributes): + for _attr in list(attribute_list): if _attr in (0x4000, 0x4010, 0x4011, 0x4015, 0x4020): - listAttrSpecific.append(_attr) + attr_spec_list.append(_attr) else: - listAttrGeneric.append(_attr) - del listAttributes - listAttributes = listAttrGeneric + appt_generic_list.append(_attr) + del attribute_list + attribute_list = appt_generic_list - if listAttributes: - # self.log.logging( "ReadAttributes", 'Debug', "Request 0201 %s/%s 0201 %s " %(key, EPout, listAttributes), nwkid=key) - self.log.logging( - "ReadAttributes", - "Debug", - "Request Thermostat via Read Attribute request %s/%s " % (key, EPout) - + " ".join("0x{:04x}".format(num) for num in listAttributes), - nwkid=key, - ) - ReadAttributeReq( - self, - key, - ZIGATE_EP, - EPout, - "0201", - listAttributes, - ackIsDisabled=is_ack_tobe_disabled(self, key), - checkTime=False, - ) + if attribute_list: + self.log.logging( "ReadAttributes", "Debug", "Request Thermostat via Read Attribute request %s/%s " % (key, ep_out) + " ".join("0x{:04x}".format(num) for num in attribute_list), nwkid=key, ) + ReadAttributeReq( self, key, ZIGATE_EP, ep_out, "0201", attribute_list, ackIsDisabled=is_ack_tobe_disabled(self, key), checkTime=False, ) - if listAttrSpecific: - # self.log.logging( "ReadAttributes", 'Debug', "Request Thermostat info via Read Attribute request Manuf Specific %s/%s %s" %(key, EPout, str(listAttrSpecific)), nwkid=key) - self.log.logging( - "ReadAttributes", - "Debug", - "Request Thermostat via Read Attribute request Manuf Specific %s/%s " % (key, EPout) - + " ".join("0x{:04x}".format(num) for num in listAttrSpecific), - nwkid=key, - ) - ReadAttributeReq( - self, - key, - ZIGATE_EP, - EPout, - "0201", - listAttrSpecific, - manufacturer_spec="01", - manufacturer=manufacturer_code, - ackIsDisabled=is_ack_tobe_disabled(self, key), - checkTime=False, - ) + if attr_spec_list: + self.log.logging( "ReadAttributes", "Debug", "Request Thermostat via Read Attribute request Manuf Specific %s/%s " % (key, ep_out) + " ".join("0x{:04x}".format(num) for num in attr_spec_list), nwkid=key, ) + ReadAttributeReq( self, key, ZIGATE_EP, ep_out, "0201", attr_spec_list, manufacturer_spec="01", manufacturer=manufacturer_code, ackIsDisabled=is_ack_tobe_disabled(self, key), checkTime=False, ) + read_manufacturer_specific_attributes(self, key, ep_out, "0201") -def ReadAttributeRequest_0201_0012(self, key): - self.log.logging("ReadAttributes", "Debug", "ReadAttributeRequest_0201 - Key: %s " % key, nwkid=key) - _model = False - if "Model" in self.ListOfDevices[key]: - _model = True - disableAck = True - if "PowerSource" in self.ListOfDevices[key] and self.ListOfDevices[key]["PowerSource"] == "Battery": - disableAck = False +def ReadAttributeRequest_0201_0012(self, key): + """ Request attribute 0x0012 (Occupied Setpoint)""" - ListOfEp = getListOfEpForCluster(self, key, "0201") - for EPout in ListOfEp: - listAttributes = [0x0012] + self.log.logging("ReadAttributes", "Debug", "ReadAttributeRequest_0201 / 0x0012 - Key: %s " % key, nwkid=key) + eps_list = getListOfEpForCluster(self, key, "0201") + for EPout in eps_list: - if "0201" in self.ListOfDevices[key]["Ep"][EPout] and "0010" in self.ListOfDevices[key]["Ep"][EPout]["0201"]: - listAttributes.append(0x0010) + cluster_0201 = self.ListOfDevices.get(key, {}).get("Ep", {}).get(EPout, {}).get("0201") + if cluster_0201: + attributes_list = [0x0012,] - ReadAttributeReq(self, key, ZIGATE_EP, EPout, "0201", listAttributes, ackIsDisabled=is_ack_tobe_disabled(self, key)) + ReadAttributeReq(self, key, ZIGATE_EP, EPout, "0201", attributes_list, ackIsDisabled=is_ack_tobe_disabled(self, key)) def ReadAttributeRequest_0202(self, key): @@ -1368,10 +1353,11 @@ def ReadAttributeRequest_0702_multiplier_divisor(self, key): self.log.logging("ReadAttributes", "Debug", "Request ReadAttributeRequest_0702 requesting Multiplier/Divisor" + key + " EPout = " + EPout, nwkid=key) ReadAttributeReq(self, key, ZIGATE_EP, EPout, "0702", listAttributes, ackIsDisabled=is_ack_tobe_disabled(self, key)) + def ReadAttributeReq_ZLinky(self, nwkid): EPout = "01" - self.log.logging("ReadAttributes", "Debug", "ReadAttributeReq_ZLinky: " + nwkid + " EPout = " + EPout, nwkid=nwkid) + self.log.logging(["ReadAttributes", "ZLinky"], "Debug", "ReadAttributeReq_ZLinky: " + nwkid + " EPout = " + EPout, nwkid=nwkid) for cluster in ( "0702", "0b01", "0b04" ): self.log.logging(["ReadAttributes", "ZLinky"], "Debug", "ReadAttributeReq_ZLinky: " + nwkid + " EPout = " + EPout + " Cluster = " + cluster, nwkid=nwkid) @@ -1390,18 +1376,19 @@ def ReadAttributeReq_Scheduled_ZLinky(self, nwkid): WORK_TO_BE_DONE = { "ff66": [ 0x0001, # Couleur du lendemain - 0x0217, # STGE + 0x0200, # LTARF ( Libellé tarif fournisseur en cours) + 0x0217, # STGE to get next day color for Standard ], "0702": [ 0x0020, # Periode Tarifaire en cours ] } - EPout = "01" + ep_out = "01" for cluster in WORK_TO_BE_DONE: - self.log.logging(["ReadAttributes", "ZLinky"], "Log", "ReadAttributeReq_Scheduled_ZLinky: %s cluster %s attribute: %s" %( + self.log.logging(["ReadAttributes", "ZLinky"], "Debug", "ReadAttributeReq_Scheduled_ZLinky: %s cluster %s attribute: %s" %( nwkid, cluster, WORK_TO_BE_DONE[ cluster ]), nwkid=nwkid) - ReadAttributeReq(self, nwkid, ZIGATE_EP, EPout, cluster, WORK_TO_BE_DONE[ cluster ], ackIsDisabled=False) + ReadAttributeReq(self, nwkid, ZIGATE_EP, ep_out, cluster, WORK_TO_BE_DONE[ cluster ], ackIsDisabled=False) def ReadAttributeRequest_0702_ZLinky_TIC(self, key): @@ -1423,7 +1410,7 @@ def ReadAttributeRequest_0702_ZLinky_TIC(self, key): else: listAttributes = [0x0020, 0x0100, 0x0102, 0x0104, 0x0106, 0x0108, 0x10A] - self.log.logging(["ReadAttributes", "ZLinky"], "ZLinky", "Request ZLinky infos on 0x0702 cluster: " + key + " EPout = " + EPout, nwkid=key) + self.log.logging(["ReadAttributes", "ZLinky"], "Debug", "Request ZLinky infos on 0x0702 cluster: " + key + " EPout = " + EPout, nwkid=key) ReadAttributeReq(self, key, ZIGATE_EP, EPout, "0702", listAttributes, ackIsDisabled=False) @@ -1438,9 +1425,9 @@ def ReadAttribute_ZLinkyIndex( self, nwkid ): } EPout = "01" - self.log.logging("ZLinky", "Debug", "ReadAttribute_ZLinkyIndex: " + nwkid + " EPout = " + EPout, nwkid=nwkid) + self.log.logging(["ReadAttributes", "ZLinky"], "Debug", "ReadAttribute_ZLinkyIndex: " + nwkid + " EPout = " + EPout, nwkid=nwkid) optarif = get_OPTARIF( self, nwkid)[:2] - self.log.logging("ZLinky", "Debug", "ReadAttribute_ZLinkyIndex: %s Cluster: %s Attributes: %s Optarif: %s" %( + self.log.logging(["ReadAttributes", "ZLinky"], "Debug", "ReadAttribute_ZLinkyIndex: %s Cluster: %s Attributes: %s Optarif: %s" %( nwkid, "0702", INDEX_ATTRIBUTES[ optarif ], optarif), nwkid=nwkid) if optarif in INDEX_ATTRIBUTES: ReadAttributeReq(self, nwkid, ZIGATE_EP, EPout, "0702", INDEX_ATTRIBUTES[ optarif ], ackIsDisabled=False) diff --git a/Modules/thermo_settings.py b/Modules/thermo_settings.py new file mode 100644 index 000000000..81c3b1873 --- /dev/null +++ b/Modules/thermo_settings.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Implementation of Zigbee for Domoticz plugin. +# +# This file is part of Zigbee for Domoticz plugin. https://github.com/zigbeefordomoticz/Domoticz-Zigbee +# (C) 2015-2024 +# +# Initial authors: zaraki673 & pipiche38 +# +# SPDX-License-Identifier: GPL-3.0 license + +from Modules.basicOutputs import write_attribute +from Modules.readAttributes import ReadAttributeReq +from Modules.tools import getListOfEpForCluster +from Modules.zigateConsts import ZIGATE_EP + +THERMOSTAT_CLUSTER = "0201" + +THERMOSTAT_CONFIG_SET = { + "MinHeatingSetpoint": ( "0015", "29"), + "MaxHeatingSetpoint": ( "0016", "29"), +} + + +def max_heating_setpoint(self, nwkid, value): + self.log.logging( "thermoSettings", "Debug", f"max_heating_setpoint for {nwkid} - value: {value}", nwkid ) + for ep in getListOfEpForCluster(self, nwkid, THERMOSTAT_CLUSTER): + write_attribute( + self, + nwkid, + ZIGATE_EP, + ep, + THERMOSTAT_CLUSTER, + "0000", + "00", + THERMOSTAT_CONFIG_SET[ "MaxHeatingSetpoint"][0], + THERMOSTAT_CONFIG_SET[ "MaxHeatingSetpoint"][1], + "%04x" %value, + ackIsDisabled=False, ) + ReadAttributeReq( self, nwkid, ZIGATE_EP, ep, THERMOSTAT_CLUSTER, [ int(THERMOSTAT_CONFIG_SET[ "MaxHeatingSetpoint"][0],16) ], ackIsDisabled=False, checkTime=False, ) + + + +def min_heating_setpoint(self, nwkid, value): + self.log.logging( "thermoSettings", "Debug", f"mix_heating_setpoint for {nwkid} - value: {value}", nwkid ) + + for ep in getListOfEpForCluster(self, nwkid, THERMOSTAT_CLUSTER): + write_attribute( + self, + nwkid, + ZIGATE_EP, + ep, + THERMOSTAT_CLUSTER, + "0000", + "00", + THERMOSTAT_CONFIG_SET[ "MinHeatingSetpoint"][0], + THERMOSTAT_CONFIG_SET[ "MinHeatingSetpoint"][1], + "%04x" %value, + ackIsDisabled=False, ) + ReadAttributeReq( self, nwkid, ZIGATE_EP, ep, THERMOSTAT_CLUSTER, [ int(THERMOSTAT_CONFIG_SET[ "MinHeatingSetpoint"][0],16) ], ackIsDisabled=False, checkTime=False, ) + + +THERMOSTAT_DEVICE_PARAMETERS = { + "MaxHeatingSetpoint": { "callable": max_heating_setpoint, "description": "Specifies the maximum level that the heating setpoint may be set to, in range of 8° - 28.5"}, + "MinHeatingSetpoint": { "callable": min_heating_setpoint, "description": "Specifies the minimum level that the heating setpoint may be set to, in range of 7.5° - 28°"} +} \ No newline at end of file diff --git a/Modules/timeServer.py b/Modules/timeServer.py index 4dc9b59a6..ccd25288c 100644 --- a/Modules/timeServer.py +++ b/Modules/timeServer.py @@ -11,97 +11,168 @@ # SPDX-License-Identifier: GPL-3.0 license - -from datetime import datetime +import os +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo from Modules.basicInputs import read_attribute_response +from Modules.sendZigateCommand import raw_APS_request +from Modules.tools import is_ack_tobe_disabled +from Modules.zigateConsts import ZIGATE_EP +ZIGBEE_EPOCH = datetime(2000, 1, 1, 0, 0, 0, 0, tzinfo=timezone.utc) +TUYA_EPOCTime = datetime(1970, 1, 1, 0, 0, 0, 0) -def timeserver_read_attribute_request(self, sqn, nwkid, ep, cluster, manuf_spec, manuf_code, attribute): - self.log.logging( - "Input", - "Debug", - "timeserver_read_attribute_request [%s] %s/%s Cluster: %s Attribute: %s" % (sqn, nwkid, ep, cluster, attribute), - ) - data_type = value = None - status = "86" - - if "SQN_000a" in self.ListOfDevices[nwkid] and sqn == self.ListOfDevices[nwkid]["SQN_000a"]: - # Duplicate - self.log.logging( - "Input", - "Debug", - "timeserver_read_attribute_request [%s] %s/%s Cluster: %s Attribute: %s already processed" - % (sqn, nwkid, ep, cluster, attribute), - ) +def get_local_timezone(): + """ + Get the local time zone name from the system environment. + + :return: The name of the local time zone. + """ + return os.environ.get('TZ', 'UTC') or 'UTC' + + + +def calculate_dst_times(): + """ + Calculate the DST start and end times for the current year in the local time zone. + + :return: Tuple containing DST start time, DST end time, and DST shift in seconds. + """ + # Get the local time zone name + local_tz_name = get_local_timezone() + local_tz = ZoneInfo(local_tz_name) + + # Get the current year + current_year = datetime.now().year + + # Find the DST start and end times for the current year + dst_start = None + dst_end = None + + # Iterate over the year to find DST transitions + for month in range(1, 13): + for day in range(1, 32): + try: + date = datetime(current_year, month, day, tzinfo=local_tz) + if date.dst() != timedelta(0) and dst_start is None: + dst_start = date + elif date.dst() == timedelta(0) and dst_start is not None: + dst_end = date + break + except ValueError: + # Handle invalid dates (e.g., February 30) + break + if dst_end: + break + + # Calculate the DST shift in seconds + dst_shift = int(dst_start.dst().total_seconds()) + + # Convert DST start and end times to UTC + dst_start_utc = int(dst_start.timestamp()) + dst_end_utc = int(dst_end.timestamp()) + + return dst_start_utc, dst_end_utc, dst_shift + + +def timeserver_multiple_read_attribute_request(self, Devices, nwkid, src_ep, dst_ep, sqn, cluster_id, manuf_specif, manuf_code, MsgData, nbAttribute): + """ Handle a read request with multiple attributes on cluster 0x000a""" + + if sqn == self.ListOfDevices[nwkid].get("SQN_000a"): + self.log.logging(["TimeServer","Input"], "Debug", f"Decode0100 - timeserver_multiple_read_attribute_request - duplicate request nwkid {nwkid} nbAttribute: {nbAttribute}", nwkid) return + self.ListOfDevices[nwkid]["SQN_000a"] = sqn + + payload = None + cmd = "01" + status = "00" + cluster_frame = "18" - if attribute == "0000": # Time ( - self.log.logging("Input", "Debug", "-->Local Time: %s" % datetime.now()) - EPOCTime = datetime(2000, 1, 1, 0, 0, 0, 0) - UTCTime = int((datetime.now() - EPOCTime).total_seconds()) - value = "%08x" % UTCTime - data_type = "e2" # UTC Type - status = "00" + # Extract all attributes, and build a response + for idx in range(0, len(MsgData), 4): + attribute = MsgData[idx:idx + 4] + self.log.logging(["TimeServer","Input"], "Debug", f"Decode0100 - timeserver_multiple_read_attribute_request - nwkid {nwkid} attribute {attribute}", nwkid) - elif attribute == "0001": # Time status - self.log.logging("Input", "Debug", "-->Time Status: %s" % 0b00001100) - value = "%02x" % 0x07 # Time Status: 0x07, Master, Synchronized, Master for Time Zone and DST - data_type = "18" # map8 - status = "00" + # Handle different cluster IDs and attributes + status, data_type, value = get_response_data_for_timer_attribute_request(self, nwkid, attribute) + self.log.logging(["TimeServer","Input"], "Debug", f"Decode0100 - timeserver_multiple_read_attribute_request - response {data_type} {value}", nwkid) + if payload is None: + payload = cluster_frame + sqn + cmd + payload += attribute[2:4] + attribute[:2] + status + data_type + payload += value[2:4] + value[:2] if data_type == "29" else value + + self.log.logging(["TimeServer","Input"], "Debug", f"Decode0100 - timeserver_multiple_read_attribute_request Response - nwkid {nwkid} ep: {src_ep} , clusterId: {cluster_id}, sqn: {sqn},payload: {payload}", nwkid) + + raw_APS_request( self, nwkid, src_ep, cluster_id, "0104", payload, zigate_ep=ZIGATE_EP, ackIsDisabled=is_ack_tobe_disabled(self, nwkid), ) - elif attribute == "0002": # Timezone - diff = datetime.fromtimestamp(86400) - datetime.utcfromtimestamp(86400) - self.log.logging("Input", "Debug", "--> TimeZone %s" % int(diff.total_seconds())) - value = "%08x" % int(diff.total_seconds()) - data_type = "2b" # int32 - status = "00" - elif attribute == "0003": # Day Light saving Start - self.log.logging("Input", "Debug", "--> DstStart %0x" % 0x00000000) - value = "%08x" % 0xffffffff - data_type = "23" # unint32 - status = "00" - - elif attribute == "0004": # Day Light saving time End - self.log.logging("Input", "Debug", "--> DstEnd %0x" % 0x00000000) - value = "%08x" % 0xffffffff - data_type = "23" # unint32 - status = "00" - - elif attribute == "0005": # Day light saving shift - self.log.logging("Input", "Debug", "--> DstShift %0x" % 0x00000000) - value = "%08x" % 0x0 - data_type = "2B" # int32 - status = "00" +def get_response_data_for_timer_attribute_request( self, nwkid, attribute): + # Default values + data_type = None + value = None + status = "86" # Default to unsupported attribute - elif attribute == "0006": # StandardTime - self.log.logging("Input", "Debug", "--> StandardTime %0x" % 0x00000000) - value = "%08x" % 0x00000000 - data_type = "23" # unint32 - status = "00" + now = datetime.now(timezone.utc) + + attribute_map = { + "0000": {"value": f"{int((now - ZIGBEE_EPOCH).total_seconds()):08x}", "data_type": "e2", "status": "00"}, # Time + "0001": {"value": f"{0x03:02x}", "data_type": "18", "status": "00"}, # Time Status + "0002": { + "value": f"{int(datetime.now().astimezone().utcoffset().total_seconds() if datetime.now().astimezone().utcoffset() else 0):08x}", + "data_type": "2b", + "status": "00", + }, # Timezone + "0003": {"value": f"{calculate_dst_times()[0]:08x}", "data_type": "23", "status": "00"}, # DST Start + "0004": {"value": f"{calculate_dst_times()[1]:08x}", "data_type": "23", "status": "00"}, # DST End + "0005": {"value": f"{calculate_dst_times()[2]:08x}", "data_type": "2B", "status": "00"}, # DST Shift + "0006": {"value": "00000000", "data_type": "23", "status": "00"}, # Standard Time + } + + if attribute in attribute_map: + value = attribute_map[attribute]["value"] + data_type = attribute_map[attribute]["data_type"] + status = attribute_map[attribute]["status"] elif attribute == "0007": # LocalTime - self.log.logging("Input", "Debug", "-->Local Time: %s" % datetime.now()) - if "Model" in self.ListOfDevices[nwkid] and self.ListOfDevices[nwkid]["Model"] == "TS0601-thermostat": + self.log.logging(["TimeServer","Input"], "Debug", f"-->Local Time: {datetime.now()}") + + epoch = TUYA_EPOCTime if self.ListOfDevices.get(nwkid, {}).get("Model") == "TS0601-thermostat" else ZIGBEE_EPOCH + if epoch == TUYA_EPOCTime: self.log.logging( - "Input", + ["TimeServer","Input"], "Debug", - "timeserver_read_attribute_request Response use EPOCH from 1970,1,1 instead of 2000,1,1", + "timeserver_read_attribute_request Response uses EPOCH from 1970-01-01 instead of 2000-01-01", ) - EPOCTime = datetime(1970, 1, 1, 0, 0, 0, 0) - else: - EPOCTime = datetime(2000, 1, 1, 0, 0, 0, 0) - UTCTime = int((datetime.now() - EPOCTime).total_seconds()) - value = "%08x" % UTCTime + + tz_offset = datetime.now().astimezone().utcoffset() or timedelta(seconds=0) + local_time = int((now + tz_offset - epoch).total_seconds()) + value = f"{local_time:08x}" data_type = "23" # uint32 status = "00" - + + return status, data_type, value + + +def timeserver_read_attribute_request(self, sqn, nwkid, ep, cluster, manuf_spec, manuf_code, attribute): + """Handles reading various time-related attributes for a Zigbee timeserver.""" + + self.log.logging( + ["TimeServer","Input"], + "Debug", + f"timeserver_read_attribute_request [{sqn}] {nwkid}/{ep} Cluster: {cluster} Attribute: {attribute}" + ) + + self.ListOfDevices[nwkid]["SQN_000a"] = sqn + + status, data_type, value = get_response_data_for_timer_attribute_request(self, nwkid, attribute) + self.log.logging( - "Input", + ["TimeServer","Input"], "Debug", - "timeserver_read_attribute_request Response: status: %s attribute: %s value: %s" % (status, attribute, value), + f"timeserver_read_attribute_request Response: status: {status} attribute: {attribute} value: {value}" ) + read_attribute_response(self, nwkid, ep, sqn, cluster, status, data_type, attribute, value, manuf_code="0000") diff --git a/Modules/zlinky.py b/Modules/zlinky.py index fd96ecd57..c4021800b 100644 --- a/Modules/zlinky.py +++ b/Modules/zlinky.py @@ -15,6 +15,15 @@ from Modules.pluginDbAttributes import (STORE_CONFIGURE_REPORTING, STORE_READ_CONFIGURE_REPORTING) +ATTR_ZLINKY = "ZLinky" +ATTR_PROTO_LINKY = "PROTOCOL Linky" + +ISOUSC_THRESHOLD_MAX = 98 +ISOUSC_THRESHOLD_MED = 90 + +TENSION_NOMINAL = 230 +KILO = 1000 + ZLINK_CONF_MODEL = ( "ZLinky_TIC", "ZLinky_TIC-historique-mono" , "ZLinky_TIC-historique-tri", @@ -31,23 +40,13 @@ 7: { "Mode": ('standard', 'tri prod'), "Conf": "ZLinky_TIC-standard-tri-prod" }, } -ZLINKY_UPGRADE_PATHS = { - "ZLinky_TIC": ( - "ZLinky_TIC-historique-mono", - "ZLinky_TIC-historique-tri", - "ZLinky_TIC-standard-mono", - "ZLinky_TIC-standard-mono-prod", - "ZLinky_TIC-standard-tri", - "ZLinky_TIC-standard-tri-prod" - ), - "ZLinky_TIC-historique-mono": ( - "ZLinky_TIC-standard-mono", - "ZLinky_TIC-standard-mono-prod", - ), - "ZLinky_TIC-historique-tri": ( - "ZLinky_TIC-standard-tri", - "ZLinky_TIC-standard-tri-prod" - ), +ZLINKY_UPGRADE_PATHS = { + "ZLinky_TIC": ( + "ZLinky_TIC-historique-mono", "ZLinky_TIC-historique-tri", + "ZLinky_TIC-standard-mono", "ZLinky_TIC-standard-mono-prod", + "ZLinky_TIC-standard-tri", "ZLinky_TIC-standard-tri-prod"), + "ZLinky_TIC-historique-mono": ( "ZLinky_TIC-standard-mono", "ZLinky_TIC-standard-mono-prod"), + "ZLinky_TIC-historique-tri": ( "ZLinky_TIC-standard-tri", "ZLinky_TIC-standard-tri-prod" ), "ZLinky_TIC-standard-mono-prod": (), "ZLinky_TIC-standard-tri": (), "ZLinky_TIC-standard-tri-prod": (), @@ -96,55 +95,91 @@ "0226": "NJOURF+1", "0227": "PJOURF+1", "0228": "PPOINTE1", - "0300": "PROTOCOL Linky" + "0300": ATTR_PROTO_LINKY } -def convert_kva_to_ampere( kva ): - return ( kva * 1000) / 200 +OP_TARIFAIRE_MAP = { + "BASE": (0, "All Hours"), + + "TH..": (0, "All Hours"), + + "HC..": (1, "Off-peak Hours"), + "HP..": (2, "Peak Hours"), + + "HN..": (1, "Normal Hours"), + "EJPHN": (1, "Normal Hours"), + "PM..": (4, "Mobile Peak Hours"), + "EJPHPM": (4, "Mobile Peak Hours"), + + "BHC": (1, "Bleu HC"), + "BHP": (1, "Bleu HP"), + "HCJB": (1, "Bleu HC"), + "HPJB": (1, "Bleu HP"), + + "WHC": (2, "Blanc HC"), + "WHP": (2, "Blanc HP"), + "HCJW": (2, "Blanc HC"), + "HPJW": (2, "Blanc HP"), + + "RHC": (4, "Rouge HC"), + "RHP": (4, "Rouge HP"), + "HCJR": (4, "Rouge HC"), + "HPJR": (4, "Rouge HP"), + +} + + +def get_notification_day_color(value): + """Determine the numeric and string representation of the day color and peak status for Domoticz UpdateDevice()""" -def zlinky_color_tarif(self, MsgSrcAddr, color): - self.ListOfDevices.setdefault(MsgSrcAddr, {}).setdefault("ZLinky", {})["Color"] = color + # Check if value exists in color_map + if value in OP_TARIFAIRE_MAP: + return OP_TARIFAIRE_MAP[value] + # Fallback for values starting with B/W/R + color_prefix_map = { + "B": [ 1, "Blue Hours"], + "W": [ 2, "White Hours"], + "R": [ 4, "Red Hours"] + } + + return color_prefix_map.get(value[0], (3, "Unknown")) # Default to (3, "Unknown") if no match -def store_ZLinky_infos( self, nwkid, command_tic, value): - if 'ZLinky' not in self.ListOfDevices[ nwkid ]: - self.ListOfDevices[ nwkid ][ 'ZLinky' ] = {} - self.ListOfDevices[ nwkid ][ 'ZLinky' ][ command_tic ] = value + +def convert_kva_to_ampere( kva ): + return ( kva * KILO) / TENSION_NOMINAL -def get_ISOUSC( self, nwkid ): +def zlinky_color_tarif(self, nwkid, color): + self.ListOfDevices.setdefault(nwkid, {}).setdefault(ATTR_ZLINKY, {})["Color"] = color - if ( - "ZLinky" in self.ListOfDevices[nwkid] - and "ISOUSC" in self.ListOfDevices[nwkid]["ZLinky"] - ): - return self.ListOfDevices[nwkid]["ZLinky"]["ISOUSC"] - ampere = False - if ( - "ZLinky" in self.ListOfDevices[nwkid] - and "PROTOCOL Linky" in self.ListOfDevices[nwkid]["ZLinky"] - and self.ListOfDevices[nwkid]["ZLinky"]["PROTOCOL Linky"] in (0, 2) - ): - # We are in Historique mode , so value is given in Ampere - ampere = True +def store_ZLinky_infos(self, nwkid, command_tic, value): + zlinky_dict = self.ListOfDevices[nwkid].setdefault('ZLinky', {}) + zlinky_dict[command_tic] = value - # Let's check if we have in the Ep values - if ( - "Ep" in self.ListOfDevices[nwkid] - and "01" in self.ListOfDevices[nwkid]["Ep"] - and "0b01" in self.ListOfDevices[nwkid]["Ep"]["01"] - and "000d" in self.ListOfDevices[nwkid]["Ep"]["01"]["0b01"] - ): - if ampere: - return self.ListOfDevices[nwkid]["Ep"]["01"]["0b01"]["000d"] +def get_ISOUSC(self, nwkid): + # Retrieve 'ISOUSC' if it exists + if 'ISOUSC' in self.ListOfDevices[nwkid]: + return self.ListOfDevices[nwkid]['ISOUSC'] - return convert_kva_to_ampere( self.ListOfDevices[nwkid]["Ep"]["01"]["0b01"]["000d"] ) + # Retrieve 'PROTOCOL Linky' from 'ZLinky' if it exists + protocol_linky = self.ListOfDevices[nwkid].get('ZLinky', {}).get('PROTOCOL Linky') + + # Retrieve '000d' from nested dictionaries in 'Ep' + isousc_from_cluster_0b01 = self.ListOfDevices[nwkid].get('Ep', {}).get('01', {}).get('0b01', {}).get('000d', 0) + + if isousc_from_cluster_0b01: + # Check protocol and convert if necessary + if protocol_linky in (0, 2): + return isousc_from_cluster_0b01 + return convert_kva_to_ampere(isousc_from_cluster_0b01) return 0 + def get_OPTARIF(self, nwkid): """ Retrieves the 'OPTARIF' value for a given network ID (nwkid) from the 'ZLinky' device data. @@ -159,7 +194,17 @@ def get_OPTARIF(self, nwkid): Returns: str: The cleaned 'OPTARIF' value, or "BASE" if not found. """ - zlinky = self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}) + def _normalize_tarif(op_tarifaire): + """ Normalize Op Tarif """ + if op_tarifaire.startswith("BBR"): + base_tarifaire = "TEMPO" # Treat any BBRx as TEMPO + elif op_tarifaire.startswith("EJP"): + base_tarifaire = "EJP" # Treat any EJPx as EJP + else: + base_tarifaire = op_tarifaire + return base_tarifaire + + zlinky = self.ListOfDevices.get(nwkid, {}).get(ATTR_ZLINKY, {}) # Get the raw value of "OPTARIF", or default to "BASE" optarif_value = zlinky.get("OPTARIF", "BASE") @@ -173,7 +218,7 @@ def get_OPTARIF(self, nwkid): if isinstance(optarif_value, str): optarif_value = optarif_value.replace('\u0000', '').replace('\x00', '').strip() - return optarif_value + return _normalize_tarif(optarif_value) def get_instant_power(self, nwkid): @@ -188,17 +233,18 @@ def get_instant_power(self, nwkid): def get_tarif_color(self, nwkid): - return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("Color") + return self.ListOfDevices.get(nwkid, {}).get(ATTR_ZLINKY, {}).get("Color") def get_ptec(self, nwkid): """ Retreive Current Tarif. (Historic)""" - return self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("PTEC") + return self.ListOfDevices.get(nwkid, {}).get(ATTR_ZLINKY, {}).get("PTEC") + def get_ltarf(self, nwkid): """ Retreive Current Tarif. (Standard)""" - _ltarf = self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}).get("LTARF") + _ltarf = self.ListOfDevices.get(nwkid, {}).get(ATTR_ZLINKY, {}).get("LTARF") # If the value is a byte string, decode and clean up if isinstance(_ltarf, bytes): # Decode the byte string to UTF-8, ignoring errors, and remove null bytes @@ -210,53 +256,67 @@ def get_ltarf(self, nwkid): return _ltarf + def zlinky_check_alarm(self, Devices, MsgSrcAddr, MsgSrcEp, value): if value == 0: return "00|Normal" - Isousc = get_ISOUSC( self, MsgSrcAddr ) + isousc = get_ISOUSC( self, MsgSrcAddr ) - if Isousc == 0: + if isousc == 0: return "00|Normal" - flevel = (value * 100) / Isousc - self.log.logging( "Cluster", "Debug", "zlinky_check_alarm - %s/%s flevel- %s %s %s" % (MsgSrcAddr, MsgSrcEp, value, Isousc, flevel), MsgSrcAddr, ) + flevel = (value * 100) / isousc + self.log.logging( "Cluster", "Debug", "zlinky_check_alarm - %s/%s flevel- %s %s %s" % (MsgSrcAddr, MsgSrcEp, value, isousc, flevel), MsgSrcAddr, ) - if flevel > 98: + if flevel > ISOUSC_THRESHOLD_MAX: self.log.logging( "Cluster", "Debug", "zlinky_check_alarm - %s/%s Alarm-01" % (MsgSrcAddr, MsgSrcEp), MsgSrcAddr, ) - return "03|Reach >98 %% of Max subscribe %s" % (Isousc) + return "03|Reach >98 %% of Max subscribe %s" % (isousc) - elif flevel > 90: + elif flevel > ISOUSC_THRESHOLD_MED: self.log.logging( "Cluster", "Debug", "zlinky_check_alarm - %s/%s Alarm-02" % (MsgSrcAddr, MsgSrcEp), MsgSrcAddr, ) - return "02|Reach >90 %% of Max subscribe %s" % (Isousc) + return "02|Reach >90 %% of Max subscribe %s" % (isousc) self.log.logging( "Cluster", "Debug", "zlinky_check_alarm - %s/%s Alarm-03" % (MsgSrcAddr, MsgSrcEp), MsgSrcAddr, ) return "00|Normal" - -def linky_mode( self, nwkid , protocol=False): - - if 'ZLinky' not in self.ListOfDevices[ nwkid ]: - return None - - if 'PROTOCOL Linky' not in self.ListOfDevices[ nwkid ]['ZLinky']: - return get_linky_mode_from_ep(self, nwkid ) - - if self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] in ZLINKY_MODE and not protocol: - return ZLINKY_MODE[ self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] ]["Mode"] - elif protocol: - return self.ListOfDevices[ nwkid ]['ZLinky']['PROTOCOL Linky'] +def linky_mode(self, nwkid, protocol=False): + """Retrieve the Linky mode for a given device.""" + + # Get or set "PROTOCOL Linky" only if it hasn't been set + zlinky_data = self.ListOfDevices.setdefault(nwkid, {}).setdefault(ATTR_ZLINKY, {}) + + if ATTR_PROTO_LINKY not in zlinky_data: + protocol_linky = get_linky_mode_from_ep(self, nwkid) + if protocol_linky is None: + return None # Do nothing if get_linky_mode_from_ep returns None + zlinky_data[ATTR_PROTO_LINKY] = protocol_linky + else: + protocol_linky = zlinky_data[ATTR_PROTO_LINKY] - return None + if protocol: + return protocol_linky # Return protocol name if requested + + return ZLINKY_MODE.get(protocol_linky, {}).get("Mode") def get_linky_mode_from_ep(self, nwkid): - ep = self.ListOfDevices.get(nwkid, {}).get("Ep", {}).get("01", {}).get("ff66", {}).get("0300") - return ep if ep in ZLINKY_MODE else None + """Retrieve the Linky protocol mode from endpoint data.""" + + protocol_linky = ( + self.ListOfDevices + .get(nwkid, {}) + .get("Ep", {}) + .get("01", {}) + .get("ff66", {}) + .get("0300") + ) + + return protocol_linky if protocol_linky in ZLINKY_MODE else None def linky_device_conf(self, nwkid): @@ -287,44 +347,38 @@ def linky_upgrade_authorized( current_model, new_model ): and new_model in ZLINKY_UPGRADE_PATHS[current_model] ) -def update_zlinky_device_model_if_needed( self, nwkid ): - - if "Model" not in self.ListOfDevices[ nwkid ]: + +def update_zlinky_device_model_if_needed(self, nwkid): + """Update ZLinky device model if an upgrade is authorized and necessary.""" + + device_info = self.ListOfDevices.get(nwkid, {}) + model_name = device_info.get("Model") + + if not model_name: return zlinky_conf = linky_device_conf(self, nwkid) - if self.ListOfDevices[ nwkid ]["Model"] != zlinky_conf: - if not linky_upgrade_authorized( self.ListOfDevices[ nwkid ]["Model"], zlinky_conf ): - self.log.logging( "ZLinky", "Log", "Not authorized adjustement ZLinky model from %s to %s" %( - self.ListOfDevices[ nwkid ]["Model"], zlinky_conf )) - return + if not linky_upgrade_authorized(model_name, zlinky_conf): + self.log.logging("ZLinky", "Log", f"Not authorized to adjust ZLinky model from {model_name} to {zlinky_conf}") + return - self.log.logging( "ZLinky", "Status", "Adjusting ZLinky model from %s to %s" %( - self.ListOfDevices[ nwkid ]["Model"], zlinky_conf )) - - # Looks like we have to update the Model in order to use the right attributes - self.ListOfDevices[ nwkid ]["Model"] = zlinky_conf + self.log.logging("ZLinky", "Status", f"Adjusting ZLinky model from {model_name} to {zlinky_conf}") - # Read Attribute has to be redone from scratch - if "ReadAttributes" in self.ListOfDevices[nwkid]: - del self.ListOfDevices[nwkid]["ReadAttributes"] + # Update the model name + device_info["Model"] = zlinky_conf - if 'ZLinky' in self.ListOfDevices[ nwkid ]: - del self.ListOfDevices[ nwkid ]['ZLinky'] + # Remove outdated attributes to trigger a fresh read + for key in ["ReadAttributes", ATTR_ZLINKY, STORE_CONFIGURE_REPORTING, STORE_READ_CONFIGURE_REPORTING]: + device_info.pop(key, None) - # Configure Reporting to be done - if STORE_CONFIGURE_REPORTING in self.ListOfDevices[nwkid]: - del self.ListOfDevices[nwkid][STORE_CONFIGURE_REPORTING] + # Force configuration reporting if enabled + if self.configureReporting: + self.configureReporting.check_configuration_reporting_for_device(nwkid, force=True) + + # Reset heartbeat status + device_info["Heartbeat"] = "-1" - if STORE_READ_CONFIGURE_REPORTING in self.ListOfDevices[nwkid]: - del self.ListOfDevices[nwkid][STORE_READ_CONFIGURE_REPORTING] - - if self.configureReporting: - self.configureReporting.check_configuration_reporting_for_device( nwkid, force=True) - - if "Heartbeat" in self.ListOfDevices[nwkid]: - self.ListOfDevices[nwkid]["Heartbeat"] = "-1" CONTACT_SEC = { 0: "fermé", @@ -372,75 +426,6 @@ def update_zlinky_device_model_if_needed( self, nwkid ): } -#def decode_STEG(stge): -# """ decoding of STGE Linky frame""" -# # Contact Sec : bit 0 -# # Organe de coupure: bits 1 à 3 -# # Etat du cache-bornes distributeur: bit 4 -# # Surtension sur une des phases: bit 6 -# # Dépassement de la puissance de référence bit 7 -# # Fonctionnement produ/conso: bit 8 -# # Sens de l'énégerie active: bit 9 -# # Tarif en cours contrat fourniture: bit 10 à 13 -# # Tarif en cours contrat distributeur: bit 14 et 15 -# # Mode dégradée de l'horloge: bit 16 -# # Etat de sortie tic: bit 17 -# # Etat de sortie Euridis: bit 19 et 20 -# # Statut du CPL: bit 21 et 22 -# # Synchro CPL: bit 23 -# # Couleur du jour: bit 24 et 25 -# # Couleur du lendemain: bit 26 et 27 -# # Préavis points mobiles: bit 28 à 29 -# # Pointe mobile: bit 30 et 31 -# -# try: -# stge = int(stge, 16) -# except ValueError: -# return {} -# -# STEG_ATTRIBUTES = { -# 'contact_sec': stge & 0x00000001, -# 'organe_coupure': (stge & 0x0000000E) >> 1, -# 'etat_cache_bornes': (stge & 0x00000010) >> 4, -# 'sur_tension': (stge & 0x00000040) >> 6, -# 'depassement_puissance': (stge & 0x00000080) >> 7, -# 'mode_fonctionnement': (stge & 0x00000100) >> 8, -# 'sens_energie': (stge & 0x00000200) >> 9, -# 'tarif_fourniture': (stge & 0x0001F000) >> 12, -# 'tarif_distributeur': (stge & 0x00060000) >> 14, -# 'Mode_horloge': (stge & 0x00100000) >> 16, -# 'sortie_tic': (stge & 0x00200000) >> 17, -# 'sortie_euridis': (stge & 0x00C00000) >> 19, -# 'status_cpl': (stge & 0x03000000) >> 21, -# 'synchro_cpl': (stge & 0x08000000) >> 23, -# 'couleur_jour': (stge & 0x30000000) >> 24, -# 'couleur_demain': (stge & 0xC0000000) >> 26, -# 'preavis_point_mobile': (stge & 0x30000000) >> 28, -# 'pointe_mobile': (stge & 0xC0000000) >> 30, -# } -# -# # Decode mapped values -# STEG_ATTRIBUTES_MAPPING = { -# 'contact_sec': CONTACT_SEC, -# 'etat_cache_bornes': ETAT_CACHE_BORNES, -# 'mode_fonctionnement': FONCTION_PROD_CONSO, -# 'sens_energie': SENS_ENERGIE, -# 'Mode_horloge': HORLOGE, -# 'sortie_tic': SORTIE_TIC, -# 'sortie_euridis': SORTIE_EURIDIS, -# 'status_cpl': STATUT_CPL, -# 'synchro_cpl': SYNCHRO_CPL, -# 'couleur_jour': COULEUR, -# 'couleur_demain': COULEUR, -# } -# -# # Decode mapped values for applicable attributes -# for attr, mapping in STEG_ATTRIBUTES_MAPPING.items(): -# if attr in STEG_ATTRIBUTES and STEG_ATTRIBUTES[attr] in mapping: -# STEG_ATTRIBUTES[attr] = mapping[STEG_ATTRIBUTES[attr]] -# -# return STEG_ATTRIBUTES - def decode_STEG(stge): """ Decoding of STGE Linky frame """ @@ -459,32 +444,65 @@ def decode_STEG(stge): ('depassement_puissance', 0x00000080, 7), # bit 7 ('mode_fonctionnement', 0x00000100, 8), # bit 8 ('sens_energie', 0x00000200, 9), # bit 9 - ('tarif_fourniture', 0x0001F000, 12), # bits 10-13 - ('tarif_distributeur', 0x00060000, 14), # bits 14-15 - ('Mode_horloge', 0x00100000, 16), # bit 16 - ('sortie_tic', 0x00200000, 17), # bit 17 - ('sortie_euridis', 0x00C00000, 19), # bits 19-20 - ('status_cpl', 0x03000000, 21), # bits 21-22 - ('synchro_cpl', 0x08000000, 23), # bit 23 - ('couleur_jour', 0x30000000, 24), # bits 24-25 - ('couleur_demain', 0xC0000000, 26), # bits 26-27 - ('preavis_point_mobile', 0x30000000, 28), # bits 28-29 + + ('tarif_fourniture', 0x00003C00, 10), # bits 10-13 + ('tarif_distributeur', 0x0000C000, 14), # bits 14-15 + + ('mode_horloge', 0x00010000, 16), # bit 16 + ('sortie_tic', 0x00020000, 17), # bit 17 + ('sortie_euridis', 0x000C0000, 19), # bits 19-20 + + ('status_cpl', 0x00300000, 21), # bits 21-22 + ('synchro_cpl', 0x00800000, 23), # bit 23 + + ('couleur_jour', 0x03000000, 24), # bits 24-25 + ('couleur_demain', 0x0C000000, 26), # bits 26-27 + + ('preavis_pointe_mobile', 0x30000000, 28), # bits 28-29 ('pointe_mobile', 0xC0000000, 30), # bits 30-31 ] # Define the mappings for each attribute MAPPINGS = { - 'contact_sec': {0: "fermé", 1: "ouvert"}, - 'etat_cache_bornes': {0: "fermé", 1: "ouvert"}, - 'mode_fonctionnement': {0: "consommateur", 1: "producteur"}, - 'sens_energie': {0: "énergie active positive", 1: "énergie active négative"}, - 'Mode_horloge': {0: "horloge correcte", 1: "horloge en mode dégradée"}, - 'sortie_tic': {0: "mode historique", 1: "mode standard"}, - 'sortie_euridis': {0: "désactivée", 1: "activée sans sécurité", 3: "activée avec sécurité"}, - 'status_cpl': {0: "New/Unlock", 1: "New/Lock", 2: "Registered"}, - 'synchro_cpl': {0: "compteur non synchronisé", 1: "compteur synchronisé"}, - 'couleur_jour': {0: "Pas d'annonce", 1: "Bleu", 2: "Blanc", 3: "Rouge"}, - 'couleur_demain': {0: "Pas d'annonce", 1: "Bleu", 2: "Blanc", 3: "Rouge"}, + 'contact_sec': { + 0: "fermé", 1: "ouvert"}, + 'etat_cache_bornes': { + 0: "fermé", 1: "ouvert"}, + 'mode_fonctionnement': { + 0: "consommateur", 1: "producteur"}, + 'sens_energie': { + 0: "énergie active positive", 1: "énergie active négative"}, + 'tarif_fourniture': { + 0: "énergie ventilée sur Index 1", 1: "énergie ventilée sur Index 2", 2: "énergie ventilée sur Index 3", + 3: "énergie ventilée sur Index 4", 4: "énergie ventilée sur Index 5", 5: "énergie ventilée sur Index 6", + 6: "énergie ventilée sur Index 7", 7: "énergie ventilée sur Index 8", 8: "énergie ventilée sur Index 9", + 9: "énergie ventilée sur Index 1"}, + 'tarif_distributeur': { + 0: "énergie ventilée sur Index 1", + 1: "énergie ventilée sur Index 2", + 2: "énergie ventilée sur Index 3", + 3: "énergie ventilée sur Index 4", + }, + 'Mode_horloge': { + 0: "horloge correcte", 1: "horloge en mode dégradée"}, + 'sortie_tic': { + 0: "mode historique", 1: "mode standard"}, + 'sortie_euridis': { + 0: "désactivée", 1: "activée sans sécurité", 3: "activée avec sécurité"}, + 'status_cpl': { + 0: "New/Unlock", 1: "New/Lock", 2: "Registered"}, + 'synchro_cpl': { + 0: "compteur non synchronisé", 1: "compteur synchronisé"}, + 'couleur_jour': { + 0: "Pas d'annonce", 1: "Bleu", 2: "Blanc", 3: "Rouge"}, + 'couleur_demain': { + 0: "Pas d'annonce", 1: "Bleu", 2: "Blanc", 3: "Rouge"}, + 'preavis_pointe_mobile': { + 0: "Pas de pointe mobile", 1: "PM 1 en cours", + 2: "PM 2 en cours", 3: "PM 3 en cours"}, + 'pointe_mobile': { + 0: "Pas de pointe mobile", 1: "PM 1 en cours", + 2: "PM 2 en cours", 3: "PM 3 en cours"} } # Initialize the result dictionary @@ -502,14 +520,14 @@ def decode_STEG(stge): def zlinky_sum_all_indexes(self, nwkid): - zlinky_info = self.ListOfDevices.get(nwkid, {}).get("ZLinky", {}) + zlinky_info = self.ListOfDevices.get(nwkid, {}).get(ATTR_ZLINKY, {}) index_mid_info = zlinky_info.get("INDEX_MID", {}) return index_mid_info.get("CompteurTotalisateur", 0) def zlinky_totalisateur(self, nwkid, attribute, value): - zlinky_info = self.ListOfDevices.setdefault(nwkid, {}).setdefault("ZLinky", {}) + zlinky_info = self.ListOfDevices.setdefault(nwkid, {}).setdefault(ATTR_ZLINKY, {}) index_mid_info = zlinky_info.setdefault("INDEX_MID", {"CompteurTotalisateur": 0}) previous_index = index_mid_info.get(attribute, {}).get("Compteur", 0) diff --git a/ReleaseNotes.md b/ReleaseNotes.md index dda14aee4..7c7cca789 100644 --- a/ReleaseNotes.md +++ b/ReleaseNotes.md @@ -23,6 +23,8 @@ Release Numbering - [Technical] - Improve plugin resiliance when connection with coordintaor is lost. - [Technical] - Support network based coordinator over Etherneth such as SMLIGHT - ADAPTATEUR USB ETHERNET POE ZIGBEE 3.0 +- [Hardware] - Improve ZLinky integration and manage TEMPO Colour on Historic and Standard mode + ## Jan. 2025 - stable7.1.020 (2025.2) - hot fix - [Issue] - fix issue which was preventing upgrading from 018 to later version diff --git a/Z4D_decoders/z4d_decoder_Read_Attribute_Request.py b/Z4D_decoders/z4d_decoder_Read_Attribute_Request.py index 1836fc2b4..f8e450de3 100644 --- a/Z4D_decoders/z4d_decoder_Read_Attribute_Request.py +++ b/Z4D_decoders/z4d_decoder_Read_Attribute_Request.py @@ -9,6 +9,8 @@ from Modules.tools import (timeStamped, updLQI, updSQN, zigpy_plugin_sanity_check) from Modules.schneider_wiser import schneider_multiple_read_attribute_request +from Modules.timeServer import timeserver_multiple_read_attribute_request + def Decode0100(self, Devices, MsgData, MsgLQI): @@ -68,6 +70,11 @@ def Decode0100(self, Devices, MsgData, MsgLQI): self.log.logging(['Input', 'Schneider'], 'Debug', 'Decode0100 - specific ---- schneider_multiple_read_attribute_request') schneider_multiple_read_attribute_request(self, Devices, MsgSrcAddr, MsgSrcEp, MsgDstEp, MsgSqn, MsgClusterId, MsgManufSpec, MsgManufCode, MsgData[24:], nbAttribute) return + + if MsgClusterId == '000a': + self.log.logging(['Input', 'Schneider'], 'Debug', 'Decode0100 - specific ---- schneider_multiple_read_attribute_request') + timeserver_multiple_read_attribute_request(self, Devices, MsgSrcAddr, MsgSrcEp, MsgDstEp, MsgSqn, MsgClusterId, MsgManufSpec, MsgManufCode, MsgData[24:], nbAttribute) + return # Iterate over attributes in the message data for idx in range(24, len(MsgData), 4): @@ -75,8 +82,7 @@ def Decode0100(self, Devices, MsgData, MsgLQI): # Handle different cluster IDs and attributes if MsgClusterId == '000a': - self.log.logging('Input', 'Debug', 'Decode0100 - Received Time Server Cluster %s/%s Idx: %s Attribute: %s' % - (MsgSrcAddr, MsgSrcEp, idx, Attribute)) + self.log.logging('Input', 'Debug', 'Decode0100 - Received Time Server Cluster %s/%s Idx: %s Attribute: %s' % (MsgSrcAddr, MsgSrcEp, idx, Attribute)) timeserver_read_attribute_request(self, MsgSqn, MsgSrcAddr, MsgSrcEp, MsgClusterId, MsgManufSpec, MsgManufCode, Attribute) elif MsgClusterId == '0201' and (manuf == '105e' or manuf_name in ('Schneider', 'Schneider Electric')): @@ -98,51 +104,3 @@ def Decode0100(self, Devices, MsgData, MsgLQI): else: self.log.logging('Input', 'Log', 'Decode0100 - Read Attribute Request %s/%s Cluster %s Attribute %s' % (MsgSrcAddr, MsgSrcEp, MsgClusterId, Attribute)) - - -#def Decode0100(self, Devices, MsgData, MsgLQI): -# MsgSqn = MsgData[:2] -# MsgSrcAddr = MsgData[2:6] -# MsgSrcEp = MsgData[6:8] -# MsgDstEp = MsgData[8:10] -# updLQI(self, MsgSrcAddr, MsgLQI) -# timeStamped(self, MsgSrcAddr, 256) -# lastSeenUpdate(self, Devices, NwkId=MsgSrcAddr) -# if MsgSrcAddr not in self.ListOfDevices: -# if not zigpy_plugin_sanity_check(self, MsgSrcAddr): -# handle_unknow_device(self, MsgSrcAddr) -# return -# if 'Model' in self.ListOfDevices[MsgSrcAddr] and self.ListOfDevices[MsgSrcAddr]['Model'] == 'TI0001' or ('Manufacturer Name' in self.ListOfDevices[MsgSrcAddr] and self.ListOfDevices[MsgSrcAddr]['Manufacturer Name'] == 'LIVOLO'): -# self.log.logging('Input', 'Debug', 'Decode0100 - (Livolo) Read Attribute Request %s/%s Data %s' % (MsgSrcAddr, MsgSrcEp, MsgData)) -# livolo_read_attribute_request(self, Devices, MsgSrcAddr, MsgSrcEp, MsgData[30:32]) -# return -# MsgClusterId = MsgData[10:14] -# MsgDirection = MsgData[14:16] -# MsgManufSpec = MsgData[16:18] -# MsgManufCode = MsgData[18:22] -# nbAttribute = MsgData[22:24] -# self.log.logging('Input', 'Debug', 'Decode0100 - Mode: %s NwkId: %s SrcEP: %s DstEp: %s ClusterId: %s Direction: %s ManufSpec: %s ManufCode: %s nbAttribute: %s' % (MsgSqn, MsgSrcAddr, MsgSrcEp, MsgDstEp, MsgClusterId, MsgDirection, MsgManufSpec, MsgManufCode, nbAttribute)) -# updSQN(self, MsgSrcAddr, MsgSqn) -# manuf = manuf_name = '' -# if 'Manufacturer Name' in self.ListOfDevices[MsgSrcAddr]: -# manuf_name = self.ListOfDevices[MsgSrcAddr]['Manufacturer Name'] -# if 'Manufacturer' in self.ListOfDevices[MsgSrcAddr]: -# manuf = self.ListOfDevices[MsgSrcAddr]['Manufacturer'] -# for idx in range(24, len(MsgData), 4): -# Attribute = MsgData[idx:idx + 4] -# if MsgClusterId == '000a': -# self.log.logging('Input', 'Debug', 'Decode0100 - Received Time Server Cluster %s/%s Idx: %s Attribute: %s' % (MsgSrcAddr, MsgSrcEp, idx, Attribute)) -# timeserver_read_attribute_request(self, MsgSqn, MsgSrcAddr, MsgSrcEp, MsgClusterId, MsgManufSpec, MsgManufCode, Attribute) -# elif MsgClusterId == '0201' and (manuf == '105e' or manuf_name == 'Schneider' or manuf_name == 'Schneider Electric'): -# wiser_read_attribute_request(self, Devices, MsgSrcAddr, MsgSrcEp, MsgSqn, MsgClusterId, Attribute) -# self.log.logging('Schneider', 'Debug', 'Decode0100 - Sqn: %s NwkId: %s SrcEP: %s DstEp: %s ClusterId: %s Direction: %s ManufSpec: %s ManufCode: %s nbAttribute: %s' % (MsgSqn, MsgSrcAddr, MsgSrcEp, MsgDstEp, MsgClusterId, MsgDirection, MsgManufSpec, MsgManufCode, nbAttribute)) -# elif MsgClusterId == '0000' and Attribute == 'f000' and (manuf_name in ('1021', 'Legrand')): -# self.log.logging('Legrand', 'Debug', 'Decode0100 - Sqn: %s NwkId: %s SrcEP: %s DstEp: %s ClusterId: %s Direction: %s ManufSpec: %s ManufCode: %s nbAttribute: %s' % (MsgSqn, MsgSrcAddr, MsgSrcEp, MsgDstEp, MsgClusterId, MsgDirection, MsgManufSpec, MsgManufCode, nbAttribute)) -# if self.pluginconf.pluginConf['LegrandCompatibilityMode']: -# operation_time = int(time.time() - self.statistics._start) -# self.log.logging('Legrand', 'Debug', '------> Operation time: %s' % operation_time, MsgSrcAddr) -# read_attribute_response(self, MsgSrcAddr, MsgSrcEp, MsgSqn, MsgClusterId, '00', '23', Attribute, '%08x' % operation_time, manuf_code=MsgManufCode) -# elif MsgClusterId == '0000' and Attribute == '0000': -# read_attribute_response(self, MsgSrcAddr, MsgSrcEp, MsgSqn, MsgClusterId, '00', '20', Attribute, '%02x' % 3, manuf_code=MsgManufCode) -# else: -# self.log.logging('Input', 'Log', 'Decode0100 - Read Attribute Request %s/%s Cluster %s Attribute %s' % (MsgSrcAddr, MsgSrcEp, MsgClusterId, Attribute)) \ No newline at end of file diff --git a/Zigbee/zclDecoders.py b/Zigbee/zclDecoders.py index c2989a26b..ab4a573e0 100644 --- a/Zigbee/zclDecoders.py +++ b/Zigbee/zclDecoders.py @@ -8,7 +8,8 @@ import struct from os import stat -from Modules.tools import (is_direction_to_client, is_direction_to_server, +from Modules.tools import (get_deviceconf_parameter_value, + is_direction_to_client, is_direction_to_server, retreive_cmd_payload_from_8002) from Modules.zigateConsts import (SIZE_DATA_TYPE, ZIGATE_EP, composite_value, discrete_value) @@ -32,20 +33,33 @@ def is_duplicate_zcl_frame(self, nwkid, cluster_id, sqn, default_response_disabl Returns: bool: True if the frame is a duplicate, False otherwise. """ + if self.zigbee_communication != "zigpy": + # No check for zigate + return False + if nwkid not in self.ListOfDevices: + # The device is not yet known + return False + if "Model" not in self.ListOfDevices[nwkid]: + return False + if not default_response_disable: + # ???? + return False + if ( - self.zigbee_communication != "zigpy" - or nwkid not in self.ListOfDevices - or not default_response_disable - or not self.pluginconf.pluginConf.get("enableZclDuplicatecheck", False) + get_deviceconf_parameter_value(self, self.ListOfDevices[nwkid]["Model"], "enableZclDuplicatecheck", return_default=False) + or self.pluginconf.pluginConf.get("enableZclDuplicatecheck", False) ): - return False # No duplicate check needed + # We have disabled the ZCL SQN duplicate check + return False zcl_sqn = self.ListOfDevices.setdefault(nwkid, {}).setdefault("ZCL-IN-SQN", {}) - if sqn == zcl_sqn.get(cluster_id): + if sqn != "00" and sqn == zcl_sqn.get(cluster_id): + self.log.logging("zclDecoder", "Log", f"Duplicate frame {nwkid} sqn: {sqn} sqn_clusters{zcl_sqn} cluster_id: {cluster_id} default_response_disable: {default_response_disable}", nwkid) return True # Duplicate frame detected zcl_sqn[cluster_id] = sqn # Store new sequence number + return False @@ -77,7 +91,7 @@ def zcl_decoders(self, src_nwk_id, src_endpoint, target_ep, cluster_id, payload, # Check for duplicate ZCL frames if is_duplicate_zcl_frame(self, src_nwk_id, cluster_id, sqn, disable_default_response): - self.log.logging("zclDecoder", "Debug", f"Duplicate frame [{sqn}] {payload}") + self.log.logging("zclDecoder", "Log", f"Duplicate frame found [{sqn}] {payload}") return None # Log ZCL message details