Skip to content

Commit

Permalink
Add exception if stream_slices was executed two times
Browse files Browse the repository at this point in the history
  • Loading branch information
tolik0 committed Jan 16, 2025
1 parent df0993e commit daa6873
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import copy
import logging

#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import copy
import logging
import threading
from collections import OrderedDict
from copy import deepcopy
Expand Down Expand Up @@ -171,6 +171,9 @@ def _emit_state_message(self) -> None:
self._message_repository.emit_message(state_message)

def stream_slices(self) -> Iterable[StreamSlice]:
if self._timer.is_running():
raise RuntimeError("stream_slices has been executed more than once.")

slices = self._partition_router.stream_slices()
self._timer.start()
for partition in slices:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def finish(self) -> int:
else:
raise RuntimeError("Global substream cursor timer not started")

def is_running(self) -> bool:
return self._start is not None


class GlobalSubstreamCursor(DeclarativeCursor):
"""
Expand Down

0 comments on commit daa6873

Please sign in to comment.