Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add capacity, oldest and newest timestamp to moving window #598

Merged
merged 4 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@

- A new class `Fuse` has been added to represent fuses. This class has a member variable `max_current` which represents the maximum current that can course through the fuse. If the current flowing through a fuse is greater than this limit, then the fuse will break the circuit.

- NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`.
- `MovingWindow` and `OrderedRingBuffer`:
- NaN values are treated as missing when gaps are determined in the `OrderedRingBuffer`.
- Provide access to `capacity` (maximum number of elements) in `MovingWindow`.
- Methods to retrieve oldest and newest timestamp of valid samples are added to both.


## Bug Fixes

Expand Down
33 changes: 33 additions & 0 deletions src/frequenz/sdk/timeseries/_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,39 @@ def sampling_period(self) -> timedelta:
"""
return self._sampling_period

@property
def oldest_timestamp(self) -> datetime | None:
"""
Return the oldest timestamp of the MovingWindow.

Returns:
The oldest timestamp of the MovingWindow or None if the buffer is empty.
"""
return self._buffer.oldest_timestamp

@property
def newest_timestamp(self) -> datetime | None:
"""
Return the newest timestamp of the MovingWindow.

Returns:
The newest timestamp of the MovingWindow or None if the buffer is empty.
"""
return self._buffer.newest_timestamp

@property
def capacity(self) -> int:
"""
Return the capacity of the MovingWindow.

Capacity is the maximum number of samples that can be stored in the
MovingWindow.

Returns:
The capacity of the MovingWindow.
"""
return self._buffer.maxlen

async def _run_impl(self) -> None:
"""Awaits samples from the receiver and updates the underlying ring buffer.

Expand Down
31 changes: 30 additions & 1 deletion src/frequenz/sdk/timeseries/_ringbuffer/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,39 @@ def time_bound_newest(self) -> datetime:
Return the time bounds of the ring buffer.

Returns:
The timestamp of the newest sample of the ring buffer.
The timestamp of the newest sample of the ring buffer
or None if the buffer is empty.
"""
return self._datetime_newest

@property
def oldest_timestamp(self) -> datetime | None:
"""Return the oldest timestamp in the buffer.

Returns:
The oldest timestamp in the buffer
or None if the buffer is empty.
"""
if len(self) == 0:
return None

if self.is_missing(self.time_bound_oldest):
return min(g.end for g in self.gaps)

return self.time_bound_oldest

@property
def newest_timestamp(self) -> datetime | None:
"""Return the newest timestamp in the buffer.

Returns:
The newest timestamp in the buffer.
"""
if len(self) == 0:
return None

return self.time_bound_newest

def datetime_to_index(
self, timestamp: datetime, allow_outside_range: bool = False
) -> int:
Expand Down
31 changes: 26 additions & 5 deletions tests/timeseries/test_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@ def fake_time() -> Iterator[time_machine.Coordinates]:


async def push_logical_meter_data(
sender: Sender[Sample[Quantity]], test_seq: Sequence[float]
sender: Sender[Sample[Quantity]],
test_seq: Sequence[float],
start_ts: datetime = UNIX_EPOCH,
) -> None:
"""Push data in the passed sender to mock `LogicalMeter` behaviour.

Starting with the First of January 2023.
Starting with UNIX_EPOCH.

Args:
sender: Sender for pushing resampled samples to the `MovingWindow`.
test_seq: The Sequence that is pushed into the `MovingWindow`.
start_ts: The start timestamp of the `MovingWindow`.
"""
start_ts: datetime = UNIX_EPOCH
for i, j in zip(test_seq, range(0, len(test_seq))):
timestamp = start_ts + timedelta(seconds=j)
await sender.send(Sample(timestamp, Quantity(float(i))))
Expand Down Expand Up @@ -125,8 +127,14 @@ async def test_window_size() -> None:
"""Test the size of the window."""
window, sender = init_moving_window(timedelta(seconds=5))
async with window:
await push_logical_meter_data(sender, range(0, 20))
assert len(window) == 5
assert window.capacity == 5, "Wrong window capacity"
assert len(window) == 0, "Window should be empty"
await push_logical_meter_data(sender, range(0, 2))
assert window.capacity == 5, "Wrong window capacity"
assert len(window) == 2, "Window should be partially full"
await push_logical_meter_data(sender, range(2, 20))
assert window.capacity == 5, "Wrong window capacity"
assert len(window) == 5, "Window should be full"


# pylint: disable=redefined-outer-name
Expand All @@ -146,6 +154,8 @@ async def test_resampling_window(fake_time: time_machine.Coordinates) -> None:
input_sampling_period=input_sampling,
resampler_config=resampler_config,
) as window:
assert window.capacity == window_size / output_sampling, "Wrong window capacity"
assert len(window) == 0, "Window should be empty at the beginning"
stream_values = [4.0, 8.0, 2.0, 6.0, 5.0] * 100
for value in stream_values:
timestamp = datetime.now(tz=timezone.utc)
Expand All @@ -157,3 +167,14 @@ async def test_resampling_window(fake_time: time_machine.Coordinates) -> None:
assert len(window) == window_size / output_sampling
for value in window: # type: ignore
assert 4.9 < value < 5.1


async def test_timestamps() -> None:
"""Test indexing a window by timestamp."""
window, sender = init_moving_window(timedelta(seconds=5))
async with window:
await push_logical_meter_data(
sender, [1, 2], start_ts=UNIX_EPOCH + timedelta(seconds=1)
)
assert window.oldest_timestamp == UNIX_EPOCH + timedelta(seconds=1)
assert window.newest_timestamp == UNIX_EPOCH + timedelta(seconds=2)
24 changes: 23 additions & 1 deletion tests/timeseries/test_ringbuffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,51 +204,73 @@ def dt(i: int) -> datetime: # pylint: disable=invalid-name
return datetime.fromtimestamp(i, tz=timezone.utc)


def test_gaps() -> None:
def test_gaps() -> None: # pylint: disable=too-many-statements
"""Test gap treatment in ordered ring buffer."""
buffer = OrderedRingBuffer([0.0] * 5, ONE_SECOND)
assert buffer.oldest_timestamp is None
assert buffer.newest_timestamp is None
assert len(buffer) == 0
assert len(buffer.gaps) == 0

buffer.update(Sample(dt(0), Quantity(0)))
assert buffer.oldest_timestamp == dt(0)
assert buffer.newest_timestamp == dt(0)
assert len(buffer) == 1
assert len(buffer.gaps) == 1

buffer.update(Sample(dt(6), Quantity(0)))
assert buffer.oldest_timestamp == dt(6)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 1
assert len(buffer.gaps) == 1

buffer.update(Sample(dt(2), Quantity(2)))
buffer.update(Sample(dt(3), Quantity(3)))
buffer.update(Sample(dt(4), Quantity(4)))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 4
assert len(buffer.gaps) == 1

buffer.update(Sample(dt(3), None))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 3
assert len(buffer.gaps) == 2

buffer.update(Sample(dt(3), Quantity(np.nan)))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 3
assert len(buffer.gaps) == 2

buffer.update(Sample(dt(2), Quantity(np.nan)))
assert buffer.oldest_timestamp == dt(4)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 2
assert len(buffer.gaps) == 2

buffer.update(Sample(dt(3), Quantity(3)))
assert buffer.oldest_timestamp == dt(3)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 3
assert len(buffer.gaps) == 2

buffer.update(Sample(dt(2), Quantity(2)))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 4
assert len(buffer.gaps) == 1

buffer.update(Sample(dt(5), Quantity(5)))
assert buffer.oldest_timestamp == dt(2)
assert buffer.newest_timestamp == dt(6)
assert len(buffer) == 5
assert len(buffer.gaps) == 0

buffer.update(Sample(dt(99), None))
assert buffer.oldest_timestamp == dt(95) # bug: should be None
assert buffer.newest_timestamp == dt(99) # bug: should be None
assert len(buffer) == 4 # bug: should be 0 (whole range gap)
assert len(buffer.gaps) == 1

Expand Down