From ae5fa9b471febcccbb4475e6bda4efd0cc6247fb Mon Sep 17 00:00:00 2001 From: daniel-zullo-frequenz <120166726+daniel-zullo-frequenz@users.noreply.github.com> Date: Wed, 20 Dec 2023 12:49:31 +0100 Subject: [PATCH] Process voltage phases dynamically Apply suggestions from code review. Co-authored-by: Leandro Lucarella Signed-off-by: daniel-zullo-frequenz <120166726+daniel-zullo-frequenz@users.noreply.github.com> --- .../sdk/timeseries/_voltage_streaming.py | 45 +++++++------------ 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_voltage_streaming.py b/src/frequenz/sdk/timeseries/_voltage_streaming.py index 78f2ecc95..6559518dd 100644 --- a/src/frequenz/sdk/timeseries/_voltage_streaming.py +++ b/src/frequenz/sdk/timeseries/_voltage_streaming.py @@ -133,25 +133,16 @@ async def _send_request(self) -> None: ComponentMetricRequest, ) - def _create_request(phase: ComponentMetricId) -> ComponentMetricRequest: - return ComponentMetricRequest( - self._namespace, - self._source_component.component_id, - phase, - None, - ) - - phase_1_req = _create_request(ComponentMetricId.VOLTAGE_PHASE_1) - phase_2_req = _create_request(ComponentMetricId.VOLTAGE_PHASE_2) - phase_3_req = _create_request(ComponentMetricId.VOLTAGE_PHASE_3) - - await self._resampler_subscription_sender.send(phase_1_req) - await self._resampler_subscription_sender.send(phase_2_req) - await self._resampler_subscription_sender.send(phase_3_req) - - phase_1_rx = self._channel_registry.new_receiver(phase_1_req.get_channel_name()) - phase_2_rx = self._channel_registry.new_receiver(phase_2_req.get_channel_name()) - phase_3_rx = self._channel_registry.new_receiver(phase_3_req.get_channel_name()) + phases_rx: list[Receiver[Sample[Quantity]]] = [] + for metric_id in (ComponentMetricId.VOLTAGE_PHASE_1, ...): + req = ComponentMetricRequest( + self._namespace, + self._source_component.component_id, + metric_id, + None, + ) + await self._resampler_subscription_sender.send(req) + phases_rx.append(self._channel_registry.new_receiver(req.get_channel_name()) sender = self._channel_registry.new_sender(self._channel_key) @@ -161,25 +152,19 @@ def _create_request(phase: ComponentMetricId) -> ComponentMetricRequest: while True: try: - phase_1 = await phase_1_rx.receive() - phase_2 = await phase_2_rx.receive() - phase_3 = await phase_3_rx.receive() + phases = [await r.receive() for r in phases_rx] - if phase_1 is None or phase_2 is None or phase_3 is None: + if not all(map(lambda x: x is not None, phases)): _logger.warning( - "Received None from voltage request: %s (%s, %s, %s)", + "Received None from voltage request: %s %s", self._source_component, - phase_1, - phase_2, - phase_3, + phases, ) continue msg = Sample3Phase( phase_1.timestamp, - Voltage.from_volts(phase_1.value.base_value), - Voltage.from_volts(phase_2.value.base_value), - Voltage.from_volts(phase_3.value.base_value), + *map(lambda p: Voltage.from_volts(p.value.base_value), phases) ) except asyncio.CancelledError: _logger.exception(