-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(low code): Add GroupingPartitionRouter #354
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThis pull request introduces a new component called GroupingPartitionRouter for declarative data extraction. It allows batches of partition keys to be grouped into slices with a configurable size and optional deduplication. The changes span YAML schema updates, new and modified Python classes in models, a factory method for component instantiation, and export adjustments to expose the new router. Existing retriever components are updated to support the new grouping functionality. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Factory as ModelToComponentFactory
participant Underlying as PartitionRouter
participant Grouping as GroupingPartitionRouter
Client->>Factory: create_grouping_partition_router(model, config)
Factory->>Factory: _create_component_from_model(model)
Factory-->>Underlying: Instantiate underlying partition router
Factory->>Factory: Validate instance is a PartitionRouter
alt Valid underlying router
Factory->>Grouping: Initialize GroupingPartitionRouter with parameters
Grouping-->>Factory: GroupingPartitionRouter instance created
else Invalid underlying router
Factory-->>Client: Raise ValueError
end
Factory-->>Client: Return GroupingPartitionRouter instance
sequenceDiagram
participant Grouping as GroupingPartitionRouter
participant Underlying as UnderlyingPartitionRouter
participant Client as DataConsumer
Client->>Grouping: Request stream slices
Grouping->>Underlying: Call stream_slices()
loop Processing partitions
Underlying-->>Grouping: Return partition slice
Grouping->>Grouping: Buffer slice (apply deduplication if enabled)
alt Batch size reached or stream end
Grouping-->>Client: Yield grouped slice batch
end
end
Possibly Related PRs
Suggested Reviewers
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
12-24
: Nitpick: Clarify mention of SinglePartitionRouter in the docstring.In the docstring (line 20), the router is referred to as a
SinglePartitionRouter
, but the code usesPartitionRouter
. Might be worth updating to maintain consistency and reduce confusion, wdyt?airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)
3133-3141
: Integration in SimpleRetriever: Inclusion of GroupingPartitionRouter
It looks like you added references to the newGroupingPartitionRouter
inside thepartition_router
allowed types forSimpleRetriever
. This should enable users to choose the grouping strategy for partitioning. Have you validated that the ordering and overall schema constraints are still correct here? wdyt?
3295-3302
: Integration in AsyncRetriever: Inclusion of GroupingPartitionRouter
Similar to the SimpleRetriever, theAsyncRetriever
now acceptsGroupingPartitionRouter
in itspartition_router
field. This is a great step toward consistency. Would you mind confirming that the asynchronous flow correctly processes grouped partitions as expected in practice? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(3 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(4 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(3 hunks)airbyte_cdk/sources/declarative/partition_routers/__init__.py
(2 hunks)airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
(1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py
[error] 43-43: Need type annotation for 'seen_keys'
[error] 51-51: Unsupported right operand type for in ('set[Any] | None')
[error] 57-57: Item 'None' of 'set[Any] | None' has no attribute 'add'
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 3056-3056: Argument 'model' to '_create_component_from_model' of 'ModelToComponentFactory' has incompatible type 'CustomPartitionRouter | ListPartitionRouter | SubstreamPartitionRouter | None'; expected 'BaseModel'
[error] 3064-3064: Unexpected keyword argument 'parameters' for 'GroupingPartitionRouter'
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 2376-2376: Invalid type comment or annotation. Suggestion: use conint[...] instead of conint(...)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
68-117
: LGTM for request/response methods.The request/response components and their delegations appear solid and maintainable. wdyt?
airbyte_cdk/sources/declarative/partition_routers/__init__.py (1)
11-13
: Looks good!Exporting
GroupingPartitionRouter
here cleanly integrates it into your module’s public API, wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
230-232
: LGTM! Import follows the established pattern.
seen_keys = set() if self.deduplicate else None | ||
|
||
# Iterate over partitions lazily from the underlying router | ||
for partition in self.underlying_partition_router.stream_slices(): | ||
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) | ||
key = next(iter(partition.partition.values()), None) | ||
|
||
# Skip duplicates if deduplication is enabled | ||
if self.deduplicate and key in seen_keys: | ||
continue | ||
|
||
# Add partition to the batch | ||
batch.append(partition) | ||
if self.deduplicate: | ||
seen_keys.add(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typing to address pipeline errors for deduplication logic.
Currently, defining seen_keys
as None
when deduplicate
is False
triggers type checker issues:
- Unsupported right operand type for
in
(line 51) - 'NoneType' object has no attribute 'add' (line 57)
- Need type annotation for 'seen_keys' (line 43)
One fix is to always define seen_keys
as a set and only use it if deduplicate
is True
. For example:
42 batch = []
-43 seen_keys = set() if self.deduplicate else None
+43 from typing import Set, Any
+44 seen_keys: Set[Any] = set()
45 # Iterate over partitions lazily from the underlying router
46 for partition in self.underlying_partition_router.stream_slices():
47 # ...
50
-51 if self.deduplicate and key in seen_keys:
+51 if self.deduplicate and key in seen_keys:
52 continue
53
54 batch.append(partition)
-57 if self.deduplicate:
-58 seen_keys.add(key)
+57 if self.deduplicate:
+58 seen_keys.add(key)
Would you like to adopt this approach, wdyt?
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
seen_keys = set() if self.deduplicate else None | |
# Iterate over partitions lazily from the underlying router | |
for partition in self.underlying_partition_router.stream_slices(): | |
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) | |
key = next(iter(partition.partition.values()), None) | |
# Skip duplicates if deduplication is enabled | |
if self.deduplicate and key in seen_keys: | |
continue | |
# Add partition to the batch | |
batch.append(partition) | |
if self.deduplicate: | |
seen_keys.add(key) | |
from typing import Set, Any | |
seen_keys: Set[Any] = set() | |
# Iterate over partitions lazily from the underlying router | |
for partition in self.underlying_partition_router.stream_slices(): | |
# Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) | |
key = next(iter(partition.partition.values()), None) | |
# Skip duplicates if deduplication is enabled | |
if self.deduplicate and key in seen_keys: | |
continue | |
# Add partition to the batch | |
batch.append(partition) | |
if self.deduplicate: | |
seen_keys.add(key) |
🧰 Tools
🪛 GitHub Actions: Linters
[error] 43-43: Need type annotation for 'seen_keys'
[error] 51-51: Unsupported right operand type for in ('set[Any] | None')
[error] 57-57: Item 'None' of 'set[Any] | None' has no attribute 'add'
class GroupingPartitionRouter(BaseModel): | ||
type: Literal["GroupingPartitionRouter"] | ||
group_size: conint(ge=1) = Field( | ||
..., | ||
description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.", | ||
examples=[10, 50], | ||
title="Group Size", | ||
) | ||
partition_router: Optional[ | ||
Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter] | ||
] = Field( | ||
None, | ||
description="The partition router whose output will be grouped. This can be any valid partition router component.", | ||
title="Underlying Partition Router", | ||
) | ||
deduplicate: Optional[bool] = Field( | ||
True, | ||
description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.", | ||
title="Deduplicate Partitions", | ||
) | ||
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use bracket notation for conint to fix type annotation error.
The pipeline indicates that conint(ge=1)
may be invalid under the new rules. Switching to bracket notation resolves it:
- group_size: conint(ge=1) = Field(
+ group_size: conint[ge=1] = Field(
...,
description="The number of partitions...",
examples=[10, 50],
title="Group Size",
)
Would you like to update it this way, wdyt?
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class GroupingPartitionRouter(BaseModel): | |
type: Literal["GroupingPartitionRouter"] | |
group_size: conint(ge=1) = Field( | |
..., | |
description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.", | |
examples=[10, 50], | |
title="Group Size", | |
) | |
partition_router: Optional[ | |
Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter] | |
] = Field( | |
None, | |
description="The partition router whose output will be grouped. This can be any valid partition router component.", | |
title="Underlying Partition Router", | |
) | |
deduplicate: Optional[bool] = Field( | |
True, | |
description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.", | |
title="Deduplicate Partitions", | |
) | |
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") | |
class GroupingPartitionRouter(BaseModel): | |
type: Literal["GroupingPartitionRouter"] | |
group_size: conint[ge=1] = Field( | |
..., | |
description="The number of partitions to include in each group. This determines how many partition values are batched together in a single slice.", | |
examples=[10, 50], | |
title="Group Size", | |
) | |
partition_router: Optional[ | |
Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter] | |
] = Field( | |
None, | |
description="The partition router whose output will be grouped. This can be any valid partition router component.", | |
title="Underlying Partition Router", | |
) | |
deduplicate: Optional[bool] = Field( | |
True, | |
description="If true, ensures that partitions are unique within each group by removing duplicates based on the partition key.", | |
title="Deduplicate Partitions", | |
) | |
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") |
🧰 Tools
🪛 GitHub Actions: Linters
[error] 2376-2376: Invalid type comment or annotation. Suggestion: use conint[...] instead of conint(...)
def create_grouping_partition_router( | ||
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any | ||
) -> GroupingPartitionRouter: | ||
underlying_router = self._create_component_from_model( | ||
model=model.partition_router, config=config | ||
) | ||
|
||
if not isinstance(underlying_router, PartitionRouter): | ||
raise ValueError( | ||
f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}" | ||
) | ||
|
||
return GroupingPartitionRouter( | ||
group_size=model.group_size, | ||
underlying_partition_router=underlying_router, | ||
deduplicate=model.deduplicate if model.deduplicate is not None else True, | ||
parameters=model.parameters or {}, | ||
config=config, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix type safety issues in the implementation.
There are a couple of type-related issues that need to be addressed:
- The
model.partition_router
needs type safety when passed to_create_component_from_model
. Consider adding a type check or assertion. - The
GroupingPartitionRouter
constructor doesn't seem to accept aparameters
argument.
What do you think about these changes?
def create_grouping_partition_router(
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any
) -> GroupingPartitionRouter:
+ if not isinstance(model.partition_router, BaseModel):
+ raise ValueError(
+ f"partition_router must be a BaseModel instance, got {type(model.partition_router)}"
+ )
+
underlying_router = self._create_component_from_model(
model=model.partition_router, config=config
)
if not isinstance(underlying_router, PartitionRouter):
raise ValueError(
f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}"
)
return GroupingPartitionRouter(
group_size=model.group_size,
underlying_partition_router=underlying_router,
deduplicate=model.deduplicate if model.deduplicate is not None else True,
- parameters=model.parameters or {},
config=config,
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def create_grouping_partition_router( | |
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any | |
) -> GroupingPartitionRouter: | |
underlying_router = self._create_component_from_model( | |
model=model.partition_router, config=config | |
) | |
if not isinstance(underlying_router, PartitionRouter): | |
raise ValueError( | |
f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}" | |
) | |
return GroupingPartitionRouter( | |
group_size=model.group_size, | |
underlying_partition_router=underlying_router, | |
deduplicate=model.deduplicate if model.deduplicate is not None else True, | |
parameters=model.parameters or {}, | |
config=config, | |
) | |
def create_grouping_partition_router( | |
self, model: GroupingPartitionRouterModel, config: Config, **kwargs: Any | |
) -> GroupingPartitionRouter: | |
if not isinstance(model.partition_router, BaseModel): | |
raise ValueError( | |
f"partition_router must be a BaseModel instance, got {type(model.partition_router)}" | |
) | |
underlying_router = self._create_component_from_model( | |
model=model.partition_router, config=config | |
) | |
if not isinstance(underlying_router, PartitionRouter): | |
raise ValueError( | |
f"Underlying partition router must be a PartitionRouter subclass, got {type(underlying_router)}" | |
) | |
return GroupingPartitionRouter( | |
group_size=model.group_size, | |
underlying_partition_router=underlying_router, | |
deduplicate=model.deduplicate if model.deduplicate is not None else True, | |
config=config, | |
) |
🧰 Tools
🪛 GitHub Actions: Linters
[error] 3056-3056: Argument 'model' to '_create_component_from_model' of 'ModelToComponentFactory' has incompatible type 'CustomPartitionRouter | ListPartitionRouter | SubstreamPartitionRouter | None'; expected 'BaseModel'
[error] 3064-3064: Unexpected keyword argument 'parameters' for 'GroupingPartitionRouter'
GroupingPartitionRouter: | ||
title: Grouping Partition Router | ||
description: > | ||
A decorator on top of a partition router that groups partitions into batches of a specified size. | ||
This is useful for APIs that support filtering by multiple partition keys in a single request. | ||
Note that per-partition incremental syncs may not work as expected because the grouping | ||
of partitions might change between syncs, potentially leading to inconsistent state tracking. | ||
type: object | ||
required: | ||
- type | ||
- group_size | ||
- underlying_partition_router | ||
properties: | ||
type: | ||
type: string | ||
enum: [GroupingPartitionRouter] | ||
group_size: | ||
title: Group Size | ||
description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice. | ||
type: integer | ||
minimum: 1 | ||
examples: | ||
- 10 | ||
- 50 | ||
partition_router: | ||
title: Underlying Partition Router | ||
description: The partition router whose output will be grouped. This can be any valid partition router component. | ||
anyOf: | ||
- "$ref": "#/definitions/CustomPartitionRouter" | ||
- "$ref": "#/definitions/ListPartitionRouter" | ||
- "$ref": "#/definitions/SubstreamPartitionRouter" | ||
deduplicate: | ||
title: Deduplicate Partitions | ||
description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key. | ||
type: boolean | ||
default: true | ||
$parameters: | ||
type: object | ||
additionalProperties: true | ||
WaitUntilTimeFromHeader: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New Component Definition: GroupingPartitionRouter
The new GroupingPartitionRouter
component looks well detailed and brings useful capabilities for batching partition keys. However, I noticed a potential mismatch: the required fields list includes underlying_partition_router
, but in the properties you defined the field as partition_router
. Should we rename the property to underlying_partition_router
to align with the required schema? For example:
- partition_router:
- title: Underlying Partition Router
- description: The partition router whose output will be grouped. This can be any valid partition router component.
- anyOf:
- - "$ref": "#/definitions/CustomPartitionRouter"
- - "$ref": "#/definitions/ListPartitionRouter"
- - "$ref": "#/definitions/SubstreamPartitionRouter"
+ underlying_partition_router:
+ title: Underlying Partition Router
+ description: The partition router whose output will be grouped. This can be any valid partition router component.
+ anyOf:
+ - "$ref": "#/definitions/CustomPartitionRouter"
+ - "$ref": "#/definitions/ListPartitionRouter"
+ - "$ref": "#/definitions/SubstreamPartitionRouter"
Would you agree that synchronizing the field name with its required identifier will help prevent schema validation issues? wdyt?
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
GroupingPartitionRouter: | |
title: Grouping Partition Router | |
description: > | |
A decorator on top of a partition router that groups partitions into batches of a specified size. | |
This is useful for APIs that support filtering by multiple partition keys in a single request. | |
Note that per-partition incremental syncs may not work as expected because the grouping | |
of partitions might change between syncs, potentially leading to inconsistent state tracking. | |
type: object | |
required: | |
- type | |
- group_size | |
- underlying_partition_router | |
properties: | |
type: | |
type: string | |
enum: [GroupingPartitionRouter] | |
group_size: | |
title: Group Size | |
description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice. | |
type: integer | |
minimum: 1 | |
examples: | |
- 10 | |
- 50 | |
partition_router: | |
title: Underlying Partition Router | |
description: The partition router whose output will be grouped. This can be any valid partition router component. | |
anyOf: | |
- "$ref": "#/definitions/CustomPartitionRouter" | |
- "$ref": "#/definitions/ListPartitionRouter" | |
- "$ref": "#/definitions/SubstreamPartitionRouter" | |
deduplicate: | |
title: Deduplicate Partitions | |
description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key. | |
type: boolean | |
default: true | |
$parameters: | |
type: object | |
additionalProperties: true | |
WaitUntilTimeFromHeader: | |
GroupingPartitionRouter: | |
title: Grouping Partition Router | |
description: > | |
A decorator on top of a partition router that groups partitions into batches of a specified size. | |
This is useful for APIs that support filtering by multiple partition keys in a single request. | |
Note that per-partition incremental syncs may not work as expected because the grouping | |
of partitions might change between syncs, potentially leading to inconsistent state tracking. | |
type: object | |
required: | |
- type | |
- group_size | |
- underlying_partition_router | |
properties: | |
type: | |
type: string | |
enum: [GroupingPartitionRouter] | |
group_size: | |
title: Group Size | |
description: The number of partitions to include in each group. This determines how many partition values are batched together in a single slice. | |
type: integer | |
minimum: 1 | |
examples: | |
- 10 | |
- 50 | |
underlying_partition_router: | |
title: Underlying Partition Router | |
description: The partition router whose output will be grouped. This can be any valid partition router component. | |
anyOf: | |
- "$ref": "#/definitions/CustomPartitionRouter" | |
- "$ref": "#/definitions/ListPartitionRouter" | |
- "$ref": "#/definitions/SubstreamPartitionRouter" | |
deduplicate: | |
title: Deduplicate Partitions | |
description: If true, ensures that partitions are unique within each group by removing duplicates based on the partition key. | |
type: boolean | |
default: true | |
$parameters: | |
type: object | |
additionalProperties: true | |
WaitUntilTimeFromHeader: |
Implements: https://github.com/airbytehq/airbyte-internal-issues/issues/11622
Summary by CodeRabbit