Skip to content

Commit

Permalink
Fix migration from python global state format
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Jan 24, 2025
1 parent c1eea6a commit 2b0a999
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
if not stream_state:
return

if self._PERPARTITION_STATE_KEY not in stream_state:
if (
self._PERPARTITION_STATE_KEY not in stream_state
and self._GLOBAL_STATE_KEY not in stream_state
):
# We assume that `stream_state` is in a global format that can be applied to all partitions.
# Example: {"global_state_format_key": "global_state_format_value"}
self._global_cursor = deepcopy(stream_state)
Expand All @@ -273,7 +276,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
else:
self._lookback_window = int(stream_state.get("lookback_window", 0))

for state in stream_state[self._PERPARTITION_STATE_KEY]:
for state in stream_state.get(self._PERPARTITION_STATE_KEY, []):
self._cursor_per_partition[self._to_partition_key(state["partition"])] = (
self._create_cursor(state["cursor"])
)
Expand Down

0 comments on commit 2b0a999

Please sign in to comment.