Skip to content

Commit

Permalink
Add lock for _cursor_per_partition
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Jan 24, 2025
1 parent d2b0917 commit 8bbb828
Showing 1 changed file with 34 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def close_partition(self, partition: Partition) -> None:
< cursor.state[self.cursor_field.cursor_field_key]
):
self._new_global_cursor = copy.deepcopy(cursor.state)
self._emit_state_message()
self._emit_state_message()

def ensure_at_least_one_state_emitted(self) -> None:
"""
Expand Down Expand Up @@ -192,7 +192,8 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St
self._global_cursor,
self._lookback_window if self._global_cursor else 0,
)
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
with self._lock:
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
self._semaphore_per_partition[self._to_partition_key(partition.partition)] = (
threading.Semaphore(0)
)
Expand All @@ -210,16 +211,38 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St

def _ensure_partition_limit(self) -> None:
"""
Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped.
Ensure the maximum number of partitions does not exceed the predefined limit.
Steps:
1. Attempt to remove partitions that are marked as finished in `_finished_partitions`.
These partitions are considered processed and safe to delete.
2. If the limit is still exceeded and no finished partitions are available for removal,
remove the oldest partition unconditionally. We expect failed partitions to be removed.
Logging:
- Logs a warning each time a partition is removed, indicating whether it was finished
or removed due to being the oldest.
"""
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
self._over_limit += 1
oldest_partition = self._cursor_per_partition.popitem(last=False)[
0
] # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
)
with self._lock:
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
# Try removing finished partitions first
for partition_key in list(self._cursor_per_partition.keys()):
if partition_key in self._finished_partitions:
oldest_partition = self._cursor_per_partition.pop(
partition_key
) # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
)
break
else:
# If no finished partitions can be removed, fall back to removing the oldest partition
oldest_partition = self._cursor_per_partition.popitem(last=False)[
1
] # Remove the oldest partition
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
)

def _set_initial_state(self, stream_state: StreamState) -> None:
"""
Expand Down

0 comments on commit 8bbb828

Please sign in to comment.