diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index b71890cce..730f8763d 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -264,7 +264,10 @@ def _set_initial_state(self, stream_state: StreamState) -> None: if not stream_state: return - if self._PERPARTITION_STATE_KEY not in stream_state: + if ( + self._PERPARTITION_STATE_KEY not in stream_state + and self._GLOBAL_STATE_KEY not in stream_state + ): # We assume that `stream_state` is in a global format that can be applied to all partitions. # Example: {"global_state_format_key": "global_state_format_value"} self._global_cursor = deepcopy(stream_state) @@ -273,7 +276,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None: else: self._lookback_window = int(stream_state.get("lookback_window", 0)) - for state in stream_state[self._PERPARTITION_STATE_KEY]: + for state in stream_state.get(self._PERPARTITION_STATE_KEY, []): self._cursor_per_partition[self._to_partition_key(state["partition"])] = ( self._create_cursor(state["cursor"]) )