Skip to content

Commit

Permalink
Model classes throw change and remove events.
Browse files Browse the repository at this point in the history
Optional use external loop
  • Loading branch information
hahn-th committed May 21, 2024
1 parent 716d1cd commit a401e0d
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 30 deletions.
3 changes: 3 additions & 0 deletions src/homematicip/events/hmip_event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ async def process_event_async(self, event_json):

client = self._model.clients.pop(data['id'], None)
if client is not None:
client.remove_object()
await self._event_manager.publish(ModelUpdateEvent.ITEM_REMOVED, client)

elif event_type == EventType.DEVICE_ADDED:
Expand All @@ -64,6 +65,7 @@ async def process_event_async(self, event_json):

device = self._model.devices.pop(data['id'], None)
if device is not None:
device.remove_object()
await self._event_manager.publish(ModelUpdateEvent.ITEM_REMOVED, device)

elif event_type == EventType.GROUP_ADDED:
Expand All @@ -84,6 +86,7 @@ async def process_event_async(self, event_json):

group = self._model.groups.pop(data['id'], None)
if group is not None:
group.remove_object()
await self._event_manager.publish(ModelUpdateEvent.ITEM_REMOVED, group)

elif event_type == EventType.HOME_CHANGED:
Expand Down
65 changes: 35 additions & 30 deletions src/homematicip/model/hmip_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from pydantic import BaseModel, ConfigDict, Field
from typing import ClassVar

from pydantic import BaseModel, ConfigDict, Field, PrivateAttr


class HmipBaseModel(BaseModel):
Expand All @@ -7,7 +9,33 @@ class HmipBaseModel(BaseModel):
arbitrary_types_allowed=True
)

_on_remove_handler: list = PrivateAttr(default_factory=list)
_on_update_handler: list = PrivateAttr(default_factory=list)

def _on_remove(self):
"""Call all subscribers of the on_remove event."""
for subscriber in self._on_remove_handler:
subscriber(self)

def _on_update(self):
"""Call all subscribers of the on_update event."""
for subscriber in self._on_update_handler:
subscriber(self)

def subscribe_on_remove(self, subscriber):
"""Subscribe to the on_remove event of this model. The subscriber will be called with the removed model as
argument."""
if subscriber not in self._on_remove_handler:
self._on_remove_handler.append(subscriber)

def subscribe_on_update(self, subscriber):
"""Subscribe to the on_update event of this model. The subscriber will be called with the updated model as
argument."""
if subscriber not in self._on_update_handler:
self._on_update_handler.append(subscriber)

def update_from_dict(self, data_dict):
"""Update this model from a dictionary. This will call all subscribers of the on_update event."""

model_fields = self.model_fields
new_item = type(self).model_validate(data_dict, strict=False)
Expand All @@ -21,33 +49,10 @@ def update_from_dict(self, data_dict):

setattr(self, field_name, new_value)

# raise on update event and notify all subscribers
self._on_update()


# if isinstance(value, dict):
# for key_dict, value_value in value.items():
# if key_dict in target_field_value:
# target_field_value[key_dict] = value_value
# else:
# setattr(self, field_name, value)
#
# # Keys durchlaufen
# # Key in target_field_value suchen
# # Wenn gefunden, dann update_from_dict(value)

# if isinstance(target_field_value, dict[str, HmipBaseModel]):
# target_field_value.update_from_dict(value)
#
# if isinstance(target_field_value, HmipBaseModel):
# target_field_value.update_from_dict(value)
#
# else:
# setattr(self, field_name, value)
# #
# # if isinstance(value, dict) and issubclass(field.outer_type_, BaseModel):
# # field_value = getattr(self, field_name, None)
# # if isinstance(field_value, BaseModel):
# # field_value.update_from_dict(value)
# # else:
# # setattr(self, field_name, field.outer_type_(**value))
# # else:
# # setattr(self, field_name, value)
def remove_object(self):
"""Must be called, when this item has been removed from the object tree. This will call all subscribers of the
on_remove event."""
self._on_remove()
5 changes: 5 additions & 0 deletions src/homematicip/runner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from dataclasses import dataclass
import json
import logging
Expand Down Expand Up @@ -25,6 +26,7 @@
class Runner:
model: Model = None
event_manager: EventManager = None
external_loop: asyncio.AbstractEventLoop = None

_connection_context: ConnectionContext = None
_rest_connection: RestConnection = None
Expand All @@ -34,6 +36,9 @@ def __post_init__(self):
if self.event_manager is None:
self.event_manager = EventManager()

if self.external_loop is not None:
asyncio.set_event_loop(self.external_loop)

def _ensure_config(self):
"""Ensure that a configuration is set. If not, try to load it from well-known locations."""
if self._config is None:
Expand Down
44 changes: 44 additions & 0 deletions tests/model/test_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
from unittest.mock import Mock

from homematicip.model import build_model_from_json
from homematicip.model.group import Group, GroupChannelReference
from homematicip.model.hmip_base import HmipBaseModel
from homematicip.model.home import Home, FunctionalHome


Expand Down Expand Up @@ -60,3 +63,44 @@ def test_home_functional_homes(filled_model):
assert light_and_shadow.active == True
assert len(light_and_shadow.extendedLinkedShutterGroups) == 0
assert light_and_shadow.solution == "LIGHT_AND_SHADOW"


def test_on_update_event():
"""Test, if the on_update event is triggered correctly."""
subscriber = Mock()
model = HmipBaseModel()
model.subscribe_on_update(subscriber)

model._on_update()

subscriber.assert_called_once_with(model)


def test_on_removed_event():
"""Test, if the on_remove event is triggered correctly."""
subscriber = Mock()
model = HmipBaseModel()
model.subscribe_on_remove(subscriber)

model._on_remove()

subscriber.assert_called_once_with(model)

def test_hmipbasemodel_subscribers_after_update(sample_data_complete):
device_id = "3014F7110000RAIN_SENSOR"
updated_json_binary = b'{"availableFirmwareVersion":"1.0.18","connectionType":"HMIP_RF","firmwareVersion":"1.0.18","firmwareVersionInteger":65554,"functionalChannels":{"0":{"busConfigMismatch":null,"coProFaulty":false,"coProRestartNeeded":false,"coProUpdateFailure":false,"configPending":false,"deviceId":"3014F7110000RAIN_SENSOR","deviceOverheated":false,"deviceOverloaded":false,"devicePowerFailureDetected":false,"deviceUndervoltage":false,"dutyCycle":false,"functionalChannelType":"DEVICE_BASE","groupIndex":0,"groups":["00000000-0000-0000-0000-000000000042"],"index":0,"label":"","lowBat":null,"multicastRoutingEnabled":false,"powerShortCircuit":null,"routerModuleEnabled":false,"routerModuleSupported":false,"rssiDeviceValue":-91,"rssiPeerValue":null,"shortCircuitDataLine":null,"supportedOptionalFeatures":{"IFeatureBusConfigMismatch":false,"IFeatureDeviceCoProError":false,"IFeatureDeviceCoProRestart":false,"IFeatureDeviceCoProUpdate":false,"IFeatureDeviceIdentify":false,"IFeatureDeviceOverheated":false,"IFeatureDeviceOverloaded":false,"IFeatureDevicePowerFailure":false,"IFeatureDeviceTemperatureOutOfRange":false,"IFeatureDeviceUndervoltage":false,"IFeatureMulticastRouter":false,"IFeaturePowerShortCircuit":false,"IFeatureRssiValue":true,"IFeatureShortCircuitDataLine":false,"IOptionalFeatureDutyCycle":true,"IOptionalFeatureLowBat":false},"temperatureOutOfRange":false,"unreach":false},"1":{"deviceId":"3014F7110000RAIN_SENSOR","functionalChannelType":"RAIN_DETECTION_CHANNEL","groupIndex":1,"groups":["00000000-0000-0000-0000-000000000043"],"index":1,"label":"","rainSensorSensitivity":10.0,"raining":false}},"homeId":"00000000-0000-0000-0000-000000000001","id":"3014F7110000RAIN_SENSOR","label":"Regensensor","lastStatusUpdate":1610893608847,"liveUpdateState":"LIVE_UPDATE_NOT_SUPPORTED","manufacturerCode":1,"modelId":412,"modelType":"HmIP-SRD","oem":"eQ-3","permanentlyReachable":true,"serializedGlobalTradeItemNumber":"3014F7110000RAIN_SENSOR","type":"RAIN_SENSOR","updateState":"UP_TO_DATE"}'
updated_json = json.loads(updated_json_binary)
base = build_model_from_json(sample_data_complete)
device = base.devices[device_id]
last_update_status_before = device.lastStatusUpdate
subscriber = Mock()

device.subscribe_on_update(subscriber)

device.update_from_dict({**updated_json})

assert device.id == device_id
assert last_update_status_before != device.lastStatusUpdate
assert len(device._on_update_handler) == 1
subscriber.assert_called_once_with(device)
assert base.devices[device_id].lastStatusUpdate == device.lastStatusUpdate

0 comments on commit a401e0d

Please sign in to comment.