Skip to content

Commit

Permalink
ref: specify stream_slice check
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 committed Jan 30, 2025
1 parent c87671e commit 0466cc7
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 7 deletions.
4 changes: 3 additions & 1 deletion airbyte_cdk/sources/declarative/declarative_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ def read_records(
"""
:param: stream_state We knowingly avoid using stream_state as we want cursors to manage their own state.
"""
if stream_slice is None or stream_slice == {}:
if stream_slice is None or (
not isinstance(stream_slice, StreamSlice) and stream_slice == {}
):
# As the parameter is Optional, many would just call `read_records(sync_mode)` during testing without specifying the field
# As part of the declarative model without custom components, this should never happen as the CDK would wire up a
# SinglePartitionRouter that would create this StreamSlice properly
Expand Down
7 changes: 1 addition & 6 deletions unit_tests/sources/declarative/async_job/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@
_NO_LIMIT = 10000


class MockAsyncJobPartitionRouter(AsyncJobPartitionRouter):
def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
yield from [{"record_field": 10}]


class MockAsyncJobRepository(AsyncJobRepository):
def start(self, stream_slice: StreamSlice) -> AsyncJob:
return AsyncJob("a_job_id", stream_slice)
Expand Down Expand Up @@ -87,7 +82,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
config={},
parameters={},
record_selector=noop_record_selector,
stream_slicer=MockAsyncJobPartitionRouter(
stream_slicer=AsyncJobPartitionRouter(
stream_slicer=self._stream_slicer,
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
MockAsyncJobRepository(),
Expand Down

0 comments on commit 0466cc7

Please sign in to comment.