Skip to content

Commit

Permalink
Fix partitions count
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Feb 17, 2025
1 parent 7b4964e commit ef8be89
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def _generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[St
self._lookback_window if self._global_cursor else 0,
)
with self._lock:
self._number_of_partitions += 1
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
self._semaphore_per_partition[self._to_partition_key(partition.partition)] = (
threading.Semaphore(0)
Expand Down Expand Up @@ -253,7 +254,6 @@ def _ensure_partition_limit(self) -> None:
self._use_global_cursor = True

with self._lock:
self._number_of_partitions += 1
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
# Try removing finished partitions first
for partition_key in list(self._cursor_per_partition.keys()):
Expand Down Expand Up @@ -334,6 +334,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
self._lookback_window = int(stream_state.get("lookback_window", 0))

for state in stream_state.get(self._PERPARTITION_STATE_KEY, []):
self._number_of_partitions += 1
self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
self._create_cursor(state["cursor"])
)
Expand Down

0 comments on commit ef8be89

Please sign in to comment.