From daa6873f02983fd7d49e1576f97a3dc76c3378dc Mon Sep 17 00:00:00 2001 From: Anatolii Yatsuk Date: Thu, 16 Jan 2025 19:28:25 +0200 Subject: [PATCH] Add exception if stream_slices was executed two times --- .../incremental/concurrent_partition_cursor.py | 9 ++++++--- .../declarative/incremental/global_substream_cursor.py | 3 +++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 37d2eb8da..2e6d5d2b9 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -1,9 +1,9 @@ -import copy -import logging - # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + +import copy +import logging import threading from collections import OrderedDict from copy import deepcopy @@ -171,6 +171,9 @@ def _emit_state_message(self) -> None: self._message_repository.emit_message(state_message) def stream_slices(self) -> Iterable[StreamSlice]: + if self._timer.is_running(): + raise RuntimeError("stream_slices has been executed more than once.") + slices = self._partition_router.stream_slices() self._timer.start() for partition in slices: diff --git a/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py b/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py index 3b3636236..f5439b9ce 100644 --- a/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py @@ -64,6 +64,9 @@ def finish(self) -> int: else: raise RuntimeError("Global substream cursor timer not started") + def is_running(self) -> bool: + return self._start is not None + class GlobalSubstreamCursor(DeclarativeCursor): """