-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: use create_concurrent_cursor_from_perpartition_cursor #286
feat: use create_concurrent_cursor_from_perpartition_cursor #286
Conversation
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Note to self: A working 185fe42 with |
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Note to review: in d6c4870 some signatures were changed, but I guess it should not affect any connectors |
📝 WalkthroughWalkthroughThe pull request introduces significant modifications to the Airbyte CDK's declarative source handling, focusing on enhancing asynchronous job processing and stream slicing. The changes primarily revolve around updating method signatures, improving type handling, and refactoring how async jobs and partitions are managed across multiple components like Changes
Possibly related issues
Possibly related PRs
Suggested reviewers
Wdyt? Would you like to explore any specific aspect of these changes further? 😊 ✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (12)
airbyte_cdk/sources/declarative/retrievers/async_retriever.py (3)
9-9
: Consider removing unused import?
It looks likeAsyncPartition
isn’t used anymore. Removing it could help keep the codebase tidy, wdyt?
59-61
: Update the docstring to match the new return type?
The docstring references returningAsyncPartition
, but the code now yields anIterable[AsyncJob]
. Would you like to revise it, wdyt?
86-87
: Ensure fallback logic for no-jobs case?
Whenjobs
is empty, the call tofetch_records
may yield nothing. Do we need a fallback or is skipping records acceptable, wdyt?airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (2)
45-47
: Consider renaming 'partition' for clarity?
Now that we store jobs inextra_fields
, maybe rename'partition'
to something like'slice_metadata'
so readers don’t mix it up with the new job concept, wdyt?
50-50
: Param rename suggestion for clarity?
Renamingpartition: Iterable[AsyncJob]
to something likejobs: Iterable[AsyncJob]
might be clearer, wdyt?unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py (3)
38-41
: Rename local variable to reduce confusion?
Because it’s now aStreamSlice
, would renamingpartition
toslice_
orstream_slice
help readers, wdyt?
43-43
: Possible rename for better clarity?
attempts_per_job
is essentially a list of jobs. Calling itjobs
might emphasize that, wdyt?
74-74
: Consistent naming for list of jobs?
Same observation: maybe renameattempts_per_job
tojobs
for uniformity, wdyt?unit_tests/sources/declarative/async_job/test_integration.py (1)
39-42
: Consider enhancing the mock implementation.The mock implementation is quite basic. Would it be helpful to add more test scenarios, wdyt? For example:
- Multiple records with different field values
- Error cases
- Different job statuses
airbyte_cdk/sources/declarative/declarative_stream.py (1)
149-152
: Type check enhancement looks good.The addition of
AsyncPartition
to the type check improves type safety. However, would it be worth adding a more descriptive error message that explains what types are supported, wdyt?- raise ValueError( - f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}" - ) + raise ValueError( + f"DeclarativeStream only supports stream_slices of type StreamSlice or AsyncPartition. Got {type(stream_slice)}" + )airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)
Line range hint
485-497
: LGTM! Consider enhancing the docstring?The change to accept
Iterable[AsyncJob]
directly instead ofAsyncPartition
looks good as it simplifies the interface. Would you consider updating the docstring to reflect this change? Something like:def fetch_records(self, partition: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: """ Fetches records from the given jobs. Args: - partition (AsyncPartition): The partition containing the jobs. + partition (Iterable[AsyncJob]): An iterable collection of jobs to fetch records from. Yields: Iterable[Mapping[str, Any]]: The fetched records from the jobs. """airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)
Line range hint
1668-1692
: Consider extracting the cursor creation logic?The cursor creation logic is quite complex with multiple conditions and parameters. Would you consider extracting it into a separate private method for better readability? Something like
_create_async_retriever_cursor
? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
(2 hunks)airbyte_cdk/sources/declarative/concurrent_declarative_source.py
(2 hunks)airbyte_cdk/sources/declarative/declarative_stream.py
(2 hunks)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
(2 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py
(2 hunks)airbyte_cdk/sources/declarative/retrievers/async_retriever.py
(4 hunks)unit_tests/sources/declarative/async_job/test_integration.py
(2 hunks)unit_tests/sources/declarative/async_job/test_job_orchestrator.py
(1 hunks)unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py
(2 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
[error] 343-343: "StreamSlice" has no attribute "stream_slice"; maybe "_stream_slice"?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1675-1675: Incompatible return value type (got "ConcurrentPerPartitionCursor", expected "StreamSlicer | None")
⏰ Context from checks skipped due to timeout of 90000ms (5)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/retrievers/async_retriever.py (1)
75-75
: Verify empty-jobs scenario?
If astream_slice
doesn’t have"jobs"
, we return an empty list. Is this an expected case, or should we raise an error, wdyt?airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py (1)
7-7
: Import addition looks good!
TheAsyncJob
import aligns with the new job-based approach perfectly, wdyt?unit_tests/sources/declarative/partition_routers/test_async_job_partition_router.py (1)
71-72
: Good assertion forStreamSlice
type!
Verifying each slice’s type is a nice check, wdyt?unit_tests/sources/declarative/async_job/test_integration.py (1)
Line range hint
90-98
: LGTM! Clean integration of the mock router.The mock router is well-integrated into the
MockSource
class, maintaining consistency with the test setup.unit_tests/sources/declarative/async_job/test_job_orchestrator.py (1)
178-178
: LGTM! Test updated to match new signature.The test case correctly reflects the change to accept an iterable of jobs.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
22-22
: LGTM! New import for ConcurrentPerPartitionCursor.Clean import addition that supports the new cursor functionality.
Line range hint
1634-1634
: LGTM! Enhanced type checking with union operator.The use of
|
operator for union types is clean and modern. Just note that this requires Python 3.10+. Are we sure all environments meet this requirement?✅ Verification successful
Python version requirement is already enforced ✨
The project's
pyproject.toml
explicitly requires Python 3.10 or higher (^3.10,<3.13
), which perfectly aligns with the union operator requirement. The codebase is also actively tested against Python 3.10 and 3.11 in CI.🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Check Python version requirement in setup.py grep "python_requires" setup.pyLength of output: 73
Script:
#!/bin/bash # Look for Python configuration files and check their content echo "=== Searching for Python config files ===" fd -e toml -e txt -e yaml -e yml . | grep -iE 'py|req|setup|workflow' echo -e "\n=== Checking pyproject.toml if exists ===" cat pyproject.toml 2>/dev/null || echo "No pyproject.toml found" echo -e "\n=== Checking requirements files ===" fd requirements.txt -x cat {} echo -e "\n=== Checking GitHub workflows ===" fd -e yml -e yaml . .github/workflows/ -x cat {}Length of output: 64537
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is for us to respect the APIs of the object we return so that we don't have to do isinstance
. Can we confirm that? We can have a sync to figure out a solution my comments aren't enough
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
…ice-and-cursor' into artem1205/fix-async-retriever-slice-and-cursor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
1634-1634
: Consider using Python's type union operator.The use of
SimpleRetrieverModel | AsyncRetrieverModel
is a nice way to leverage Python's union operator. However, for consistency with the rest of the codebase, would you consider usingUnion[SimpleRetrieverModel, AsyncRetrieverModel]
instead? This would maintain compatibility with older Python versions too, wdyt?- and isinstance(model, SimpleRetrieverModel | AsyncRetrieverModel) + and isinstance(model, Union[SimpleRetrieverModel, AsyncRetrieverModel])
1668-1673
: Consider improving error message clarity.The error message about AsyncRetriever's limitations is informative but could be more structured. Would you consider breaking it into bullet points for better readability, wdyt?
- raise ValueError( - "AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet" - ) + raise ValueError( + "AsyncRetriever limitations:\n" + "- Only DatetimeBasedCursor is currently supported\n" + "- Support for other cursor types is planned for future releases" + )
1685-1691
: Consider adding type hints for config parameter.The code uses
config or {}
which implies config could be None, but the type hints in the method signature might not reflect this. Would you consider updating the type hints, wdyt?def create_concurrent_cursor_from_datetime_based_cursor( self, model_type: Type[BaseModel], component_definition: ComponentDefinition, stream_name: str, stream_namespace: Optional[str], - config: Config, + config: Optional[Config], **kwargs: Any, ) -> ConcurrentCursor:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1674-1684
: Consider adding runtime validation for stream name.The code passes an empty string as the stream name if
model.name
is None. This could lead to issues with state management or logging. Would you consider adding a validation check, wdyt?+ if not model.name: + raise ValueError("Stream name cannot be empty for ConcurrentPerPartitionCursor") return self.create_concurrent_cursor_from_perpartition_cursor( state_manager=self._connector_state_manager, model_type=DatetimeBasedCursorModel, component_definition=model.incremental_sync.__dict__, - stream_name=model.name or "", + stream_name=model.name, stream_namespace=None, config=config or {}, stream_state={}, partition_router=stream_slicer, )Let's verify the impact:
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/declarative_stream.py (1)
141-143
: LGTM! Consider enhancing the error message for better debugging?The type validation logic looks good! For better debugging experience, what do you think about including the actual type in the error message? Something like:
- f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {stream_slice}" + f"DeclarativeStream does not support stream_slices that are not StreamSlice. Got {type(stream_slice).__name__}: {stream_slice}"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py
(1 hunks)airbyte_cdk/sources/declarative/declarative_stream.py
(1 hunks)airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/partition_routers/async_job_partition_router.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/async_job/job_orchestrator.py (1)
485-495
: LGTM! Clean refactor to simplify the interface.The change from
AsyncPartition
toIterable[AsyncJob]
makes the method more focused and flexible. Documentation is clear and implementation is straightforward.
fixed, thanks for pointing out. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1659-1659
: Consider using Union for better compatibility?The
|
operator for union types was introduced in Python 3.10. While it's more concise, usingUnion
fromtyping
would maintain compatibility with older Python versions. WDYT?- and isinstance(model, SimpleRetrieverModel | AsyncRetrieverModel) + and isinstance(model, Union[SimpleRetrieverModel, AsyncRetrieverModel])
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (7)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-the-guardian-api' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
1693-1698
: LGTM! Clear error handling for unsupported cursor types.The error message clearly explains the current limitation of AsyncRetriever to DatetimeBasedCursor and provides context about unordered slices. This helps developers understand why the limitation exists.
1699-1709
: LGTM! Robust concurrent cursor creation with partition router.The code correctly handles the case when a partition router exists by creating a ConcurrentPerPartitionCursor with all necessary parameters. The type ignore comment is well-documented, explaining why it's safe to ignore the StreamSlicer interface mismatch.
1710-1716
: LGTM! Fallback to regular concurrent cursor.When no partition router exists, the code falls back to creating a regular ConcurrentCursor. The type ignore comment is carried over with the same justification about the StreamSlicer interface.
upd: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thanks for addressing the concerns
What
resolving airbytehq/airbyte#48449
How
add concurrent_cursor_from_perpartition_cursor to asyncretriever
User Impact
No impact is expected. This is not a breaking change.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor