Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak sensor value setter to validate dtype #98

Merged
merged 14 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ repos:
'katcp-codec==0.1.0',
'pytest==8.1.1',
'types-decorator==5.1.1',
'typing-extensions==4.11.0'
'typing-extensions==4.11.0',
'numpy==1.24.4'
]
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ test = [
"pytest",
"pytest-asyncio>=0.23",
"pytest-mock",
"numpy",
bmerry marked this conversation as resolved.
Show resolved Hide resolved
]
doc = [
"sphinx",
Expand Down
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pytest-asyncio
pytest-cov
pytest-mock
typing-extensions
numpy
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ katcp-codec==0.1.0
# via -r requirements.in
nodeenv==1.8.0
# via pre-commit
numpy==1.24.4
# via -r requirements.in
packaging==24.0
# via pytest
platformdirs==4.2.0
Expand Down
5 changes: 4 additions & 1 deletion src/aiokatcp/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ class Address:
_IPV6_RE = re.compile(r"^\[(?P<host>[^]]+)\](:(?P<port>\d+))?$")

def __init__(self, host: _IPAddress, port: Optional[int] = None) -> None:
self._host = host
if isinstance(host, typing.get_args(_IPAddress)):
self._host = host
else:
raise TypeError(f"{host} is not of either {typing.get_args(_IPAddress)}")
bmerry marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coveralls is failing and it looks like it's because there is no test coverage for this. Add a unit test that tries to construct an Address from the wrong type.

self._port = port

@property
Expand Down
53 changes: 48 additions & 5 deletions src/aiokatcp/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import enum
import functools
import inspect
import numbers
import time
import warnings
import weakref
Expand Down Expand Up @@ -168,6 +169,7 @@ def __init__(
self.stype = sensor_type
type_info = core.get_type(sensor_type)
self.type_name = type_info.name
self._core_type = type_info.type_
self._classic_observers: Set[ClassicObserver[_T]] = set()
self._delta_observers: Set[DeltaObserver[_T]] = set()
self.name = name
Expand All @@ -177,7 +179,7 @@ def __init__(
if default is None:
value: _T = type_info.default(sensor_type)
else:
value = default
value = self._check_value_type(default)
self._reading = Reading(time.time(), initial_status, value)
if auto_strategy is None:
self.auto_strategy = SensorSampler.Strategy.AUTO
Expand All @@ -186,6 +188,37 @@ def __init__(
self.auto_strategy_parameters = tuple(auto_strategy_parameters)
# TODO: should validate the parameters against the strategy.

def _check_value_type(self, value: Any) -> _T:
"""Verify incoming value is compatible with sensor type.
bmerry marked this conversation as resolved.
Show resolved Hide resolved

If it is compatible, cast it to the expected data type to ensure the
incoming value is in the required format. This also handles the special
case of :class:`core.Timestamp` where it is used with `float` type
interchangeably.

Raises
------
TypeError
If the incoming `value` type is not compatible with the sensor's
core type.
"""
if isinstance(value, self.stype):
# It's an exact match
bmerry marked this conversation as resolved.
Show resolved Hide resolved
return value
bmerry marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(value, self._core_type) and not issubclass(self._core_type, enum.Enum):
# The more general case of e.g. numpy types not being directly
# comparable with python types. Explicitly cast it into the desired
# type.
bmerry marked this conversation as resolved.
Show resolved Hide resolved
return self.stype(value) # type: ignore [call-arg]
elif isinstance(value, numbers.Real) and self.stype is core.Timestamp:
# core.Timestamp can also be an int or float
return self.stype(value) # type: ignore [call-arg]
else:
raise TypeError(
f"Value type {type(value)} is not compatible with Sensor type "
f"{self.stype} with core type {self._core_type}"
)

def notify(self, reading: Reading[_T], old_reading: Reading[_T]) -> None:
"""Notify all observers of changes to this sensor.

Expand All @@ -198,10 +231,13 @@ def notify(self, reading: Reading[_T], old_reading: Reading[_T]) -> None:
delta_observer(self, reading, old_reading=old_reading)

def set_value(
self, value: _T, status: Optional[Status] = None, timestamp: Optional[float] = None
self, value: Any, status: Optional[Status] = None, timestamp: Optional[float] = None
) -> None:
"""Set the current value of the sensor.

Also validate that the incoming value type is compatible with the core
type of this sensor.

bmerry marked this conversation as resolved.
Show resolved Hide resolved
Parameters
----------
value
Expand All @@ -214,12 +250,19 @@ def set_value(
timestamp
The time at which the sensor value was determined (seconds).
If not given, it defaults to :func:`time.time`.

Raises
------
TypeError
If the incoming `value` type is not compatible with the sensor's
core type.
"""
checked_value = self._check_value_type(value)
bmerry marked this conversation as resolved.
Show resolved Hide resolved
if timestamp is None:
timestamp = time.time()
if status is None:
status = self.status_func(value)
reading = Reading(timestamp, status, value)
status = self.status_func(checked_value)
reading = Reading(timestamp, status, checked_value)
old_reading = self._reading
self._reading = reading
self.notify(reading, old_reading)
Expand All @@ -233,7 +276,7 @@ def value(self) -> _T:
return self.reading.value

@value.setter
def value(self, value: _T) -> None:
def value(self, value: Any) -> None:
self.set_value(value)

@property
Expand Down
84 changes: 83 additions & 1 deletion tests/test_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,19 @@
"""

import asyncio
import enum
import gc
import unittest
import weakref
from typing import List, Optional
from ipaddress import IPv4Address
from typing import Any, List, Optional, Type, TypeVar
from unittest import mock
from unittest.mock import create_autospec

import numpy as np
import pytest

from aiokatcp.core import Address, Timestamp
from aiokatcp.sensor import (
AggregateSensor,
Reading,
Expand All @@ -49,6 +53,8 @@
_weak_callback,
)

_T = TypeVar("_T")


@pytest.mark.parametrize(
"status,valid",
Expand Down Expand Up @@ -80,6 +86,82 @@ def status_func(value):
assert sensor.status == Sensor.Status.NOMINAL


class MyEnum(enum.Enum):
ZERO = 0
ONE = 1


class OtherEnum(enum.Enum):
ABC = 0
DEF = 1


@pytest.mark.parametrize(
"sensor_name,sensor_type,default,compatible_value",
[
("bool", bool, False, True),
("int", int, 0, np.int32(1234)),
("float", float, 0.0, np.int32(1234678)),
("float", float, 0.0, np.float32(12345678)),
("str", str, "zero", "one-two-three-four"),
("bytes", bytes, b"zero", b"one-two-three-four"),
("address", Address, Address(IPv4Address("0.0.0.0")), Address(IPv4Address("1.2.3.4"))),
("timestamp", Timestamp, Timestamp(0.0), 12345678),
("enum", MyEnum, MyEnum.ZERO, MyEnum.ONE),
],
)
def test_sensor_value_setter_success(
sensor_name: str,
bmerry marked this conversation as resolved.
Show resolved Hide resolved
sensor_type: Type[_T],
bmerry marked this conversation as resolved.
Show resolved Hide resolved
default: Any,
bmerry marked this conversation as resolved.
Show resolved Hide resolved
compatible_value: Any,
) -> None:
"""Check a compatible value against a sensor type.

This uses a default value to ensure the :class:`Sensor`\'s constructor
also performs the value type check. It also checks compatibility with
numpy scalars.
"""
sensor_under_test = Sensor(
bmerry marked this conversation as resolved.
Show resolved Hide resolved
sensor_type,
f"test-{sensor_name}",
default,
)
sensor_under_test.value = compatible_value
assert sensor_under_test.value == compatible_value
bmerry marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize(
"sensor_name,sensor_type,bad_value",
[
("bool", bool, "True"),
("int", int, "1234"),
bmerry marked this conversation as resolved.
Show resolved Hide resolved
("float", float, "1234.5"),
("str", str, {"a": 1}),
bmerry marked this conversation as resolved.
Show resolved Hide resolved
("bytes", bytes, "1"),
(
"address",
Address,
"0.0.0.0",
),
("timestamp", Timestamp, "12345678"),
("enum", MyEnum, OtherEnum.ABC),
],
)
def test_sensor_value_setter_failure(
sensor_name: str,
bmerry marked this conversation as resolved.
Show resolved Hide resolved
sensor_type: Type[_T],
bad_value: Any,
) -> None:
"""Check an incompatible value for a sensor"""
sensor_under_test = Sensor(
sensor_type,
f"test-{sensor_name}",
)
with pytest.raises(TypeError):
sensor_under_test.value = bad_value
bmerry marked this conversation as resolved.
Show resolved Hide resolved

bmerry marked this conversation as resolved.
Show resolved Hide resolved

@pytest.fixture
def classic_observer():
def classic(sensor, reading):
Expand Down
Loading