Skip to content

Commit

Permalink
Separate tango everything device server and client logic. Breaks test…
Browse files Browse the repository at this point in the history
…s requiring converters for tango commands
  • Loading branch information
jsouter committed Feb 18, 2025
1 parent 9e23079 commit 82bfccf
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 310 deletions.
4 changes: 4 additions & 0 deletions src/ophyd_async/tango/core/_tango_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,10 @@ def make_converter(info: AttributeInfoEx | CommandInfo) -> TangoConverter:
return TangoDevStateSpectrumConverter()
elif info.data_format == AttrDataFormat.IMAGE:
return TangoDevStateImageConverter()
else: # command info
match info.in_type:
case CmdArgType.DevState:
return TangoDevStateConverter()
# default case return trivial converter
return TangoConverter()

Expand Down
4 changes: 3 additions & 1 deletion src/ophyd_async/tango/core/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ def get_full_attr_trl(device_trl: str, attr_name: str):


def get_device_trl_and_attr(name: str):
# TODO put example of what the attr_name could look like here...
# trl can have form:
# <protocol://><server:host/>domain/family/member/attr_name<#dbase=no>
# e.g. tango://127.0.0.1:8888/test/nodb/test#dbase=no
re_str = (
r"([\.a-zA-Z0-9_-]*://)?([\.a-zA-Z0-9_-]+:[0-9]+/)?"
r"([^#/]+/[^#/]+/[^#/]+/)([^#/]+)(#dbase=[a-z]+)?"
Expand Down
3 changes: 1 addition & 2 deletions src/ophyd_async/tango/testing/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from ._one_of_everything import (
ExampleStrEnum,
OneOfEverythingTangoDevice,
everything_signal_info,
)

__all__ = ["ExampleStrEnum", "OneOfEverythingTangoDevice", "everything_signal_info"]
__all__ = ["ExampleStrEnum", "OneOfEverythingTangoDevice"]
281 changes: 127 additions & 154 deletions src/ophyd_async/tango/testing/_one_of_everything.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import textwrap
from dataclasses import dataclass
from random import choice
from typing import Any, Generic, TypeVar

import numpy as np
Expand All @@ -10,7 +9,6 @@
DTypeScalar_co,
StrictEnum,
)
from ophyd_async.tango.core import DevStateEnum
from ophyd_async.testing import float_array_value, int_array_value
from tango import AttrDataFormat, AttrWriteType, DevState
from tango.server import Device, attribute, command
Expand Down Expand Up @@ -40,171 +38,144 @@ def float_image_value(
return np.vstack((array_1d, array_1d))


def _valid_command(dformat: AttrDataFormat, dtype: str):
if dtype == "DevUChar":
return False
if dformat != AttrDataFormat.SCALAR and dtype in ["DevState", "DevEnum"]:
return False
return True


@dataclass
class AttributeData(Generic[T]):
name: str
tango_type: str
py_type: type
initial_value: T
random_put_values: tuple[T, ...]
dformat = AttrDataFormat.SCALAR
cmd_name: str | None

def random_value(self):
return choice(self.random_put_values)


class SpectrumData(AttributeData):
dformat = AttrDataFormat.SPECTRUM

def random_value(self):
array = self.initial_value.copy()
for idx in range(len(array)):
array[idx] = choice(self.random_put_values)
return array


class ImageData(AttributeData):
dformat = AttrDataFormat.IMAGE
cmd_name = None
initial_scalar: T
initial_spectrum: Array1D


_all_attribute_definitions = [
AttributeData(
"str",
"DevString",
"test_string",
np.array(["one", "two", "three"], dtype=str),
),
AttributeData(
"bool",
"DevBoolean",
True,
np.array([False, True], dtype=bool),
),
AttributeData("strenum", "DevEnum", 1, np.array([0, 1, 2])),
AttributeData("int8", "DevShort", 1, int_array_value(np.int8)),
AttributeData("uint8", "DevUChar", 1, int_array_value(np.uint8)),
AttributeData("int16", "DevShort", 1, int_array_value(np.int16)),
AttributeData("uint16", "DevUShort", 1, int_array_value(np.uint16)),
AttributeData("int32", "DevLong", 1, int_array_value(np.int32)),
AttributeData("uint32", "DevULong", 1, int_array_value(np.uint32)),
AttributeData("int64", "DevLong64", 1, int_array_value(np.int64)),
AttributeData("uint64", "DevULong64", 1, int_array_value(np.uint64)),
AttributeData("float32", "DevFloat", 1.234, float_array_value(np.float32)),
AttributeData("float64", "DevDouble", 1.234, float_array_value(np.float64)),
AttributeData(
"my_state",
"DevState",
DevState.INIT,
np.array([DevState.INIT, DevState.ON, DevState.MOVING], dtype=DevState),
),
]

def random_value(self):
array = self.initial_value.copy()
for idx in range(array.shape[1]):
array[0, idx] = choice(self.random_put_values)
array[1, idx] = choice(self.random_put_values)
return array


everything_signal_info = []


def add_ads(
name: str,
tango_type: str,
py_type: type,
initial_scalar,
initial_spectrum,
choices,
):
scalar_cmd = f"{name}_cmd" if tango_type != "DevUChar" else None
everything_signal_info.append(
AttributeData(name, tango_type, py_type, initial_scalar, choices, scalar_cmd)
)
spectrum_cmd = (
f"{name}_spectrum_cmd"
if tango_type not in ["DevUChar", "DevState", "DevEnum"]
else None
)
everything_signal_info.append(
SpectrumData(
f"{name}_spectrum",
tango_type,
Array1D[py_type],
initial_spectrum,
choices,
spectrum_cmd,
class OneOfEverythingTangoDevice(Device):
attr_values = {}
initial_values = {}

def _add_attr(self, attr: attribute, initial_value):
self.attr_values[attr.name] = initial_value
self.initial_values[attr.name] = initial_value
self.add_attribute(attr)
self.set_change_event(attr.name, True, False)

def add_scalar_attr(self, name: str, dtype: str, initial_value: Any):
attr = attribute(
name=name,
dtype=dtype,
dformat=AttrDataFormat.SCALAR,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
enum_labels=[e.value for e in ExampleStrEnum],
)
)
everything_signal_info.append(
ImageData(
f"{name}_image",
tango_type,
np.ndarray[Any, np.dtype[py_type]],
np.vstack((initial_spectrum, initial_spectrum)),
choices,
None,
self._add_attr(attr, initial_value)

def add_array_attrs(self, name: str, dtype: str, initial_value: np.ndarray):
spectrum_name = f"{name}_spectrum"
spectrum_attr = attribute(
name=spectrum_name,
dtype=dtype,
dformat=AttrDataFormat.SPECTRUM,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
max_dim_x=initial_value.shape[-1],
enum_labels=[e.value for e in ExampleStrEnum],
)
)


add_ads(
"str",
"DevString",
str,
"test_string",
np.array(["one", "two", "three"], dtype=str),
("four", "five", "six"),
)
add_ads(
"bool",
"DevBoolean",
bool,
True,
np.array([False, True], dtype=bool),
(False, True),
)
add_ads("strenum", "DevEnum", StrictEnum, 1, np.array([0, 1, 2]), (0, 1, 2))
add_ads("int8", "DevShort", int, 1, int_array_value(np.int8), (1, 2, 3, 4, 5))
add_ads("uint8", "DevUChar", int, 1, int_array_value(np.uint8), (1, 2, 3, 4, 5))
add_ads("int16", "DevShort", int, 1, int_array_value(np.int16), (1, 2, 3, 4, 5))
add_ads("uint16", "DevUShort", int, 1, int_array_value(np.uint16), (1, 2, 3, 4, 5))
add_ads("int32", "DevLong", int, 1, int_array_value(np.int32), (1, 2, 3, 4, 5))
add_ads("uint32", "DevULong", int, 1, int_array_value(np.uint32), (1, 2, 3, 4, 5))
add_ads("int64", "DevLong64", int, 1, int_array_value(np.int64), (1, 2, 3, 4, 5))
add_ads("uint64", "DevULong64", int, 1, int_array_value(np.uint64), (1, 2, 3, 4, 5))
add_ads(
"float32",
"DevFloat",
float,
1.234,
float_array_value(np.float32),
(1.234, 2.345, 3.456),
)
add_ads(
"float64",
"DevDouble",
float,
1.234,
float_array_value(np.float64),
(1.234, 2.345, 3.456),
)
add_ads(
"my_state",
"DevState",
DevStateEnum,
DevState.INIT,
np.array([DevState.INIT, DevState.ON, DevState.MOVING], dtype=DevState),
(DevState.INIT, DevState.ON, DevState.MOVING),
)

image_name = f"{name}_image"
image_attr = attribute(
name=image_name,
dtype=dtype,
dformat=AttrDataFormat.IMAGE,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
max_dim_x=initial_value.shape[-1],
max_dim_y=2,
enum_labels=[e.value for e in ExampleStrEnum],
)
self._add_attr(spectrum_attr, initial_value)
# have image just be 2 of the initial spectrum stacked
self._add_attr(image_attr, np.vstack((initial_value, initial_value)))

def add_scalar_command(self, name: str, dtype: str):
if _valid_command(AttrDataFormat.SCALAR, dtype):
self.add_command(
command(
f=getattr(self, f"{name}_cmd"),
dtype_in=dtype,
dtype_out=dtype,
dformat_in=AttrDataFormat.SCALAR,
dformat_out=AttrDataFormat.SCALAR,
),
)

class OneOfEverythingTangoDevice(Device):
attr_values = {}
def add_spectrum_command(self, name: str, dtype: str):
if _valid_command(AttrDataFormat.SPECTRUM, dtype):
self.add_command(
command(
f=getattr(self, f"{name}_spectrum_cmd"),
dtype_in=dtype,
dtype_out=dtype,
dformat_in=AttrDataFormat.SPECTRUM,
dformat_out=AttrDataFormat.SPECTRUM,
),
)

def initialize_dynamic_attributes(self):
self.reset_values()
for attr_data in everything_signal_info:
attr = attribute(
name=attr_data.name,
dtype=attr_data.tango_type,
dformat=attr_data.dformat,
access=AttrWriteType.READ_WRITE,
fget=self.read,
fset=self.write,
max_dim_x=0
if attr_data.dformat == AttrDataFormat.SCALAR
else attr_data.initial_value.shape[-1],
max_dim_y=2,
enum_labels=[e.value for e in ExampleStrEnum],
for attr_data in _all_attribute_definitions:
self.add_scalar_attr(
attr_data.name, attr_data.tango_type, attr_data.initial_scalar
)
self.add_array_attrs(
attr_data.name, attr_data.tango_type, attr_data.initial_spectrum
)
self.add_attribute(attr)
self.set_change_event(attr.name, True, False)
if attr_data.cmd_name:
self.add_command(
command(
f=getattr(self, attr_data.cmd_name),
dtype_in=attr_data.tango_type,
dtype_out=attr_data.tango_type,
dformat_in=attr_data.dformat,
dformat_out=attr_data.dformat,
)
)
self.add_scalar_command(attr_data.name, attr_data.tango_type)
self.add_spectrum_command(attr_data.name, attr_data.tango_type)

@command
def reset_values(self):
for attr_data in everything_signal_info:
self.attr_values[attr_data.name] = attr_data.initial_value
for attr_name in self.attr_values:
self.attr_values[attr_name] = self.initial_values[attr_name]

def read(self, attr):
value = self.attr_values[attr.get_name()]
Expand All @@ -222,6 +193,8 @@ def {}(self, arg):
"""
)

for attr_data in everything_signal_info:
if attr_data.dformat != AttrDataFormat.IMAGE:
for attr_data in _all_attribute_definitions:
if _valid_command(AttrDataFormat.SCALAR, attr_data.tango_type):
exec(echo_command_code.format(f"{attr_data.name}_cmd"))
if _valid_command(AttrDataFormat.SPECTRUM, attr_data.tango_type):
exec(echo_command_code.format(f"{attr_data.name}_spectrum_cmd"))
Loading

0 comments on commit 82bfccf

Please sign in to comment.