diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index efa5996b3..ddcba0470 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -215,6 +215,7 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St self._lookback_window if self._global_cursor else 0, ) with self._lock: + self._number_of_partitions += 1 self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor self._semaphore_per_partition[self._to_partition_key(partition.partition)] = ( threading.Semaphore(0) @@ -253,7 +254,6 @@ def _ensure_partition_limit(self) -> None: self._use_global_cursor = True with self._lock: - self._number_of_partitions += 1 while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1: # Try removing finished partitions first for partition_key in list(self._cursor_per_partition.keys()): @@ -334,6 +334,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None: self._lookback_window = int(stream_state.get("lookback_window", 0)) for state in stream_state.get(self._PERPARTITION_STATE_KEY, []): + self._number_of_partitions += 1 self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( self._create_cursor(state["cursor"]) )