diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 30f926b..651b59f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -41,5 +41,5 @@ repos: 'katcp-codec==0.1.0', 'pytest==8.1.1', 'types-decorator==5.1.1', - 'typing-extensions==4.11.0' + 'typing-extensions==4.11.0', ] diff --git a/src/aiokatcp/core.py b/src/aiokatcp/core.py index 8c1d4f4..d7596fe 100644 --- a/src/aiokatcp/core.py +++ b/src/aiokatcp/core.py @@ -79,6 +79,8 @@ class Address: _IPV6_RE = re.compile(r"^\[(?P[^]]+)\](:(?P\d+))?$") def __init__(self, host: _IPAddress, port: Optional[int] = None) -> None: + if not isinstance(host, typing.get_args(_IPAddress)): + raise TypeError(f"{host} is not of either {typing.get_args(_IPAddress)}") self._host = host self._port = port diff --git a/src/aiokatcp/sensor.py b/src/aiokatcp/sensor.py index 60364d0..f8c8120 100644 --- a/src/aiokatcp/sensor.py +++ b/src/aiokatcp/sensor.py @@ -30,6 +30,7 @@ import enum import functools import inspect +import numbers import time import warnings import weakref @@ -168,6 +169,7 @@ def __init__( self.stype = sensor_type type_info = core.get_type(sensor_type) self.type_name = type_info.name + self._core_type = type_info.type_ self._classic_observers: Set[ClassicObserver[_T]] = set() self._delta_observers: Set[DeltaObserver[_T]] = set() self.name = name @@ -177,7 +179,7 @@ def __init__( if default is None: value: _T = type_info.default(sensor_type) else: - value = default + value = self._cast_value_type(default) self._reading = Reading(time.time(), initial_status, value) if auto_strategy is None: self.auto_strategy = SensorSampler.Strategy.AUTO @@ -186,6 +188,38 @@ def __init__( self.auto_strategy_parameters = tuple(auto_strategy_parameters) # TODO: should validate the parameters against the strategy. + def _cast_value_type(self, value: Any) -> _T: + """Coerce incoming value to the sensor type. + + If it is compatible, cast it to the expected data type to ensure the + incoming value is in the required format. This also handles the special + case of :class:`core.Timestamp` where it is used with `float` type + interchangeably. + + Raises + ------ + TypeError + If the incoming `value` type is not compatible with the sensor's + core type. + """ + if isinstance(value, self.stype): + # The value is not a special case and can be returned unchanged + return value + elif isinstance(value, self._core_type) and not issubclass(self._core_type, enum.Enum): + # The more general case where the value is not derived from stype + # but is derived from the registered base type. e.g. numpy reals + # don't derive from Python's float, but do derive from numbers.Real. + # Explicitly cast it into the desired type. + return self.stype(value) # type: ignore [call-arg] + elif isinstance(value, numbers.Real) and self.stype is core.Timestamp: + # core.Timestamp can also be an int or float + return self.stype(value) # type: ignore [call-arg] + else: + raise TypeError( + f"Value type {type(value)} is not compatible with Sensor type " + f"{self.stype} with core type {self._core_type}" + ) + def notify(self, reading: Reading[_T], old_reading: Reading[_T]) -> None: """Notify all observers of changes to this sensor. @@ -198,10 +232,14 @@ def notify(self, reading: Reading[_T], old_reading: Reading[_T]) -> None: delta_observer(self, reading, old_reading=old_reading) def set_value( - self, value: _T, status: Optional[Status] = None, timestamp: Optional[float] = None + self, value: Any, status: Optional[Status] = None, timestamp: Optional[float] = None ) -> None: """Set the current value of the sensor. + Also validate that the incoming value type is compatible with the core + type of this sensor. If compatible, coerce it to an instance of the + :attr:`stype`. + Parameters ---------- value @@ -214,12 +252,19 @@ def set_value( timestamp The time at which the sensor value was determined (seconds). If not given, it defaults to :func:`time.time`. + + Raises + ------ + TypeError + If the incoming `value` type is not compatible with the sensor's + core type. """ + checked_value = self._cast_value_type(value) if timestamp is None: timestamp = time.time() if status is None: - status = self.status_func(value) - reading = Reading(timestamp, status, value) + status = self.status_func(checked_value) + reading = Reading(timestamp, status, checked_value) old_reading = self._reading self._reading = reading self.notify(reading, old_reading) @@ -233,7 +278,7 @@ def value(self) -> _T: return self.reading.value @value.setter - def value(self, value: _T) -> None: + def value(self, value: Any) -> None: self.set_value(value) @property diff --git a/tests/test_core.py b/tests/test_core.py index cd1bdc4..8c0824c 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -131,6 +131,11 @@ def test_parse_bad(self, value) -> None: with pytest.raises(ValueError): Address.parse(value) + @pytest.mark.parametrize("value", ["127.0.0.1"]) + def test_bad_host(self, value) -> None: + with pytest.raises(TypeError): + Address(value) + class TestEncodeDecode: VALUES = [ diff --git a/tests/test_sensor.py b/tests/test_sensor.py index 6e9e2bc..8897fe5 100644 --- a/tests/test_sensor.py +++ b/tests/test_sensor.py @@ -30,15 +30,19 @@ """ import asyncio +import enum import gc import unittest import weakref -from typing import List, Optional +from fractions import Fraction +from ipaddress import IPv4Address +from typing import Any, List, Optional, Type, TypeVar from unittest import mock from unittest.mock import create_autospec import pytest +from aiokatcp.core import Address, Timestamp from aiokatcp.sensor import ( AggregateSensor, Reading, @@ -49,6 +53,8 @@ _weak_callback, ) +_T = TypeVar("_T") + @pytest.mark.parametrize( "status,valid", @@ -80,6 +86,88 @@ def status_func(value): assert sensor.status == Sensor.Status.NOMINAL +class MyEnum(enum.Enum): + ZERO = 0 + ONE = 1 + + +class OtherEnum(enum.Enum): + ABC = 0 + DEF = 1 + + +@pytest.mark.parametrize( + "sensor_type,compatible_value", + [ + (bool, True), + (int, 1234), + (float, 1234), + (float, 1234.5), + (float, Fraction(8190, 64)), + (str, "one-two-three-four"), + (bytes, b"one-two-three-four"), + (Address, Address(IPv4Address("1.2.3.4"))), + (Timestamp, 12345678), + (MyEnum, MyEnum.ONE), + ], +) +@pytest.mark.parametrize("initialise_sensor", [True, False]) +def test_sensor_value_setter_success( + sensor_type: Type, + compatible_value: Any, + initialise_sensor: bool, +) -> None: + """Check a compatible value against a sensor type. + + Using `initialise_sensor`, this test checks `compatible_value` either as a + default sensor value, or as a new value to an existing sensor. + """ + sensor = Sensor( + sensor_type, + "test-sensor", + default=compatible_value if initialise_sensor else None, + ) + if not initialise_sensor: + sensor.value = compatible_value + assert sensor.value == compatible_value + assert isinstance(sensor.value, sensor_type) + + +@pytest.mark.parametrize( + "sensor_type,bad_value", + [ + (bool, "True"), + (int, "1234"), + (int, 1234.5), + (float, 1 + 2j), + (str, {"a": 1}), + (str, b"bytes"), + (bytes, "str"), + (Address, "0.0.0.0"), + (Timestamp, "12345678"), + (MyEnum, OtherEnum.ABC), + ], +) +def test_sensor_value_setter_failure( + sensor_type: Type[_T], + bad_value: Any, +) -> None: + """Check an incompatible value for a sensor. + + Check the `bad_value` is not compatible both as a new value for an existing + sensor and as an initial value for a new sensor. + """ + sensor = Sensor( + sensor_type, + "test-sensor", + ) + with pytest.raises(TypeError): + sensor.value = bad_value + + with pytest.raises(TypeError): + Sensor(sensor_type, "test-sensor", default=bad_value) + + @pytest.fixture def classic_observer(): def classic(sensor, reading):