diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 730f8763d..74442b968 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -147,7 +147,7 @@ def close_partition(self, partition: Partition) -> None: < cursor.state[self.cursor_field.cursor_field_key] ): self._new_global_cursor = copy.deepcopy(cursor.state) - self._emit_state_message() + self._emit_state_message() def ensure_at_least_one_state_emitted(self) -> None: """ @@ -192,7 +192,8 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St self._global_cursor, self._lookback_window if self._global_cursor else 0, ) - self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor + with self._lock: + self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor self._semaphore_per_partition[self._to_partition_key(partition.partition)] = ( threading.Semaphore(0) ) @@ -210,16 +211,38 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St def _ensure_partition_limit(self) -> None: """ - Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped. + Ensure the maximum number of partitions does not exceed the predefined limit. + + Steps: + 1. Attempt to remove partitions that are marked as finished in `_finished_partitions`. + These partitions are considered processed and safe to delete. + 2. If the limit is still exceeded and no finished partitions are available for removal, + remove the oldest partition unconditionally. We expect failed partitions to be removed. + + Logging: + - Logs a warning each time a partition is removed, indicating whether it was finished + or removed due to being the oldest. """ - while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: - self._over_limit += 1 - oldest_partition = self._cursor_per_partition.popitem(last=False)[ - 0 - ] # Remove the oldest partition - logger.warning( - f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." - ) + with self._lock: + while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: + # Try removing finished partitions first + for partition_key in list(self._cursor_per_partition.keys()): + if partition_key in self._finished_partitions: + oldest_partition = self._cursor_per_partition.pop( + partition_key + ) # Remove the oldest partition + logger.warning( + f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." + ) + break + else: + # If no finished partitions can be removed, fall back to removing the oldest partition + oldest_partition = self._cursor_per_partition.popitem(last=False)[ + 1 + ] # Remove the oldest partition + logger.warning( + f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}." + ) def _set_initial_state(self, stream_state: StreamState) -> None: """