Skip to content

Commit

Permalink
Process voltage phases dynamically
Browse files Browse the repository at this point in the history
Apply suggestions from code review.

Co-authored-by: Leandro Lucarella <luca@llucax.com>
Signed-off-by: daniel-zullo-frequenz <120166726+daniel-zullo-frequenz@users.noreply.github.com>
  • Loading branch information
daniel-zullo-frequenz and llucax authored Dec 20, 2023
1 parent 8f2194e commit ae5fa9b
Showing 1 changed file with 15 additions and 30 deletions.
45 changes: 15 additions & 30 deletions src/frequenz/sdk/timeseries/_voltage_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,25 +133,16 @@ async def _send_request(self) -> None:
ComponentMetricRequest,
)

def _create_request(phase: ComponentMetricId) -> ComponentMetricRequest:
return ComponentMetricRequest(
self._namespace,
self._source_component.component_id,
phase,
None,
)

phase_1_req = _create_request(ComponentMetricId.VOLTAGE_PHASE_1)
phase_2_req = _create_request(ComponentMetricId.VOLTAGE_PHASE_2)
phase_3_req = _create_request(ComponentMetricId.VOLTAGE_PHASE_3)

await self._resampler_subscription_sender.send(phase_1_req)
await self._resampler_subscription_sender.send(phase_2_req)
await self._resampler_subscription_sender.send(phase_3_req)

phase_1_rx = self._channel_registry.new_receiver(phase_1_req.get_channel_name())
phase_2_rx = self._channel_registry.new_receiver(phase_2_req.get_channel_name())
phase_3_rx = self._channel_registry.new_receiver(phase_3_req.get_channel_name())
phases_rx: list[Receiver[Sample[Quantity]]] = []
for metric_id in (ComponentMetricId.VOLTAGE_PHASE_1, ...):
req = ComponentMetricRequest(
self._namespace,
self._source_component.component_id,
metric_id,
None,
)
await self._resampler_subscription_sender.send(req)
phases_rx.append(self._channel_registry.new_receiver(req.get_channel_name())

sender = self._channel_registry.new_sender(self._channel_key)

Expand All @@ -161,25 +152,19 @@ def _create_request(phase: ComponentMetricId) -> ComponentMetricRequest:

while True:
try:
phase_1 = await phase_1_rx.receive()
phase_2 = await phase_2_rx.receive()
phase_3 = await phase_3_rx.receive()
phases = [await r.receive() for r in phases_rx]

if phase_1 is None or phase_2 is None or phase_3 is None:
if not all(map(lambda x: x is not None, phases)):
_logger.warning(
"Received None from voltage request: %s (%s, %s, %s)",
"Received None from voltage request: %s %s",
self._source_component,
phase_1,
phase_2,
phase_3,
phases,
)
continue

msg = Sample3Phase(
phase_1.timestamp,
Voltage.from_volts(phase_1.value.base_value),
Voltage.from_volts(phase_2.value.base_value),
Voltage.from_volts(phase_3.value.base_value),
*map(lambda p: Voltage.from_volts(p.value.base_value), phases)
)
except asyncio.CancelledError:
_logger.exception(
Expand Down

0 comments on commit ae5fa9b

Please sign in to comment.