Skip to content

Commit

Permalink
Add deleting finished semaphores
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Feb 21, 2025
1 parent c51f840 commit 4a18954
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ def _check_and_update_parent_state(self) -> None:
Pop the leftmost partition state from _partition_parent_state_map only if
*all partitions* up to (and including) that partition key in _semaphore_per_partition
are fully finished (i.e. in _finished_partitions and semaphore._value == 0).
Additionally, delete finished semaphores with a value of 0 to free up memory,
as they are only needed to track errors and completion status.
"""
last_closed_state = None

Expand All @@ -178,7 +180,9 @@ def _check_and_update_parent_state(self) -> None:

# Verify ALL partitions from the left up to earliest_key are finished
all_left_finished = True
for p_key, sem in self._semaphore_per_partition.items():
for p_key, sem in list(
self._semaphore_per_partition.items()
): # Use list to allow modification during iteration
# If any earlier partition is still not finished, we must stop
if p_key not in self._finished_partitions or sem._value != 0:
all_left_finished = False
Expand All @@ -191,17 +195,26 @@ def _check_and_update_parent_state(self) -> None:
if not all_left_finished:
break

# Otherwise, pop the leftmost entry from parent-state map
# Pop the leftmost entry from parent-state map
_, closed_parent_state = self._partition_parent_state_map.popitem(last=False)
last_closed_state = closed_parent_state

# Update _parent_state if we actually popped at least one partition
# Clean up finished semaphores with value 0 up to and including earliest_key
for p_key in list(self._semaphore_per_partition.keys()):
sem = self._semaphore_per_partition[p_key]
if p_key in self._finished_partitions and sem._value == 0:
del self._semaphore_per_partition[p_key]
logger.debug(f"Deleted finished semaphore for partition {p_key} with value 0")
if p_key == earliest_key:
break

# Update _parent_state if we popped at least one partition
if last_closed_state is not None:
self._parent_state = last_closed_state

def ensure_at_least_one_state_emitted(self) -> None:
"""
The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
The platform expects at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
called.
"""
if not any(
Expand Down Expand Up @@ -238,6 +251,7 @@ def _emit_state_message(self, throttle: bool = True) -> None:
self._message_repository.emit_message(state_message)

def stream_slices(self) -> Iterable[StreamSlice]:
print("stream_slices")
if self._timer.is_running():
raise RuntimeError("stream_slices has been executed more than once.")

Expand Down Expand Up @@ -313,9 +327,9 @@ def _ensure_partition_limit(self) -> None:
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
# Try removing finished partitions first
for partition_key in list(self._cursor_per_partition.keys()):
if (
partition_key in self._finished_partitions
and self._semaphore_per_partition[partition_key]._value == 0
if partition_key in self._finished_partitions and (
partition_key not in self._semaphore_per_partition
or self._semaphore_per_partition[partition_key]._value == 0
):
oldest_partition = self._cursor_per_partition.pop(
partition_key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3159,6 +3159,7 @@ def test_given_unfinished_first_parent_partition_no_parent_state_update():
}
assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition
assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition
assert len(cursor._semaphore_per_partition) == 2


def test_given_unfinished_last_parent_partition_with_partial_parent_state_update():
Expand Down Expand Up @@ -3243,6 +3244,7 @@ def test_given_unfinished_last_parent_partition_with_partial_parent_state_update
}
assert mock_cursor_1.stream_slices.call_count == 1 # Called once for each partition
assert mock_cursor_2.stream_slices.call_count == 1 # Called once for each partition
assert len(cursor._semaphore_per_partition) == 1


def test_given_all_partitions_finished_when_close_partition_then_final_state_emitted():
Expand Down Expand Up @@ -3317,6 +3319,7 @@ def test_given_all_partitions_finished_when_close_partition_then_final_state_emi
assert final_state["lookback_window"] == 1
assert cursor._message_repository.emit_message.call_count == 2
assert mock_cursor.stream_slices.call_count == 2 # Called once for each partition
assert len(cursor._semaphore_per_partition) == 1


def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_global_cursor():
Expand Down Expand Up @@ -3377,3 +3380,75 @@ def test_given_partition_limit_exceeded_when_close_partition_then_switch_to_glob
assert "lookback_window" in final_state
assert len(cursor._cursor_per_partition) <= cursor.DEFAULT_MAX_PARTITIONS_NUMBER
assert mock_cursor.stream_slices.call_count == 3 # Called once for each partition


def test_semaphore_cleanup():
# Create two mock cursors with different states for each partition
mock_cursor_1 = MagicMock()
mock_cursor_1.stream_slices.return_value = iter(
[
{"slice1": "data1"},
{"slice2": "data1"}, # First partition slices
]
)
mock_cursor_1.state = {"updated_at": "2024-01-02T00:00:00Z"} # State for partition "1"

mock_cursor_2 = MagicMock()
mock_cursor_2.stream_slices.return_value = iter(
[
{"slice2": "data2"},
{"slice2": "data2"}, # Second partition slices
]
)
mock_cursor_2.state = {"updated_at": "2024-01-03T00:00:00Z"} # State for partition "2"

# Configure cursor factory to return different mock cursors based on partition
cursor_factory_mock = MagicMock()
cursor_factory_mock.create.side_effect = [mock_cursor_1, mock_cursor_2]

cursor = ConcurrentPerPartitionCursor(
cursor_factory=cursor_factory_mock,
partition_router=MagicMock(),
stream_name="test_stream",
stream_namespace=None,
stream_state={},
message_repository=MagicMock(),
connector_state_manager=MagicMock(),
connector_state_converter=MagicMock(),
cursor_field=CursorField(cursor_field_key="updated_at"),
)

# Simulate partitions with unique parent states
slices = [
StreamSlice(partition={"id": "1"}, cursor_slice={}),
StreamSlice(partition={"id": "2"}, cursor_slice={}),
]
cursor._partition_router.stream_slices.return_value = iter(slices)
# Simulate unique parent states for each partition
cursor._partition_router.get_stream_state.side_effect = [
{"parent": {"state": "state1"}}, # Parent state for partition "1"
{"parent": {"state": "state2"}}, # Parent state for partition "2"
]

# Generate slices to populate semaphores and parent states
generated_slices = list(
cursor.stream_slices()
) # Populate _semaphore_per_partition and _partition_parent_state_map

# Verify initial state
assert len(cursor._semaphore_per_partition) == 2
assert len(cursor._partition_parent_state_map) == 2
assert cursor._partition_parent_state_map['{"id":"1"}'] == {"parent": {"state": "state1"}}
assert cursor._partition_parent_state_map['{"id":"2"}'] == {"parent": {"state": "state2"}}

# Close partitions to acquire semaphores (value back to 0)
for s in generated_slices:
cursor.close_partition(DeclarativePartition("test_stream", {}, MagicMock(), MagicMock(), s))

# Check state after closing partitions
assert len(cursor._finished_partitions) == 2
assert len(cursor._semaphore_per_partition) == 0
assert '{"id":"1"}' not in cursor._semaphore_per_partition
assert '{"id":"2"}' not in cursor._semaphore_per_partition
assert len(cursor._partition_parent_state_map) == 0 # All parent states should be popped
assert cursor._parent_state == {"parent": {"state": "state2"}} # Last parent state

0 comments on commit 4a18954

Please sign in to comment.