From 2dddf8ec29610b7e323a4ab963b4c8d77638b333 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 26 Nov 2024 03:55:02 +0100 Subject: [PATCH 01/19] Add component resolver and http component resolver --- .../declarative_component_schema.yaml | 82 ++++++++++ .../models/declarative_component_schema.py | 58 ++++++++ .../parsers/model_to_component_factory.py | 113 +++++++++++--- .../declarative/partition_routers/__init__.py | 3 +- .../sources/declarative/resolvers/__init__.py | 8 + .../resolvers/components_resolver.py | 49 ++++++ .../resolvers/http_components_resolver.py | 140 ++++++++++++++++++ 7 files changed, 431 insertions(+), 22 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/resolvers/__init__.py create mode 100644 airbyte_cdk/sources/declarative/resolvers/components_resolver.py create mode 100644 airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 654a6b751..cfba4f3ae 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2700,6 +2700,88 @@ definitions: $parameters: type: object additionalProperties: true + ComponentMappingDefinition: + title: Component Mapping Definition + description: Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts. + type: object + required: + - type + - key + - value + properties: + type: + type: string + enum: [ComponentMappingDefinition] + key: + title: Key + description: The target key in the stream template where the value will be added or updated. + type: string + value: + title: Value + description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime. + type: string + interpolation_context: + - config + - stream_template_config + - components_values + examples: + - "{{ components_values['updates'] }}" + - "{{ components_values['MetaData']['LastUpdatedTime'] }}" + - "{{ config['segment_id'] }}" + value_type: + title: Value Type + description: The expected data type of the value. If omitted, the type will be inferred from the value provided. + "$ref": "#/definitions/ValueType" + condition: + title: Condition + description: An optional condition that must evaluate to `true` for the mapping to be applied. This can use interpolation for dynamic evaluation. + type: string + default: "" + interpolation_context: + - config + - stream_template_config + - components_values + examples: + - "{{ components_values['created_at'] >= stream_interval['start_time'] }}" + - "{{ components_values.status in ['active', 'expired'] }}" + $parameters: + type: object + additionalProperties: true + HttpComponentsResolver: + type: object + properties: + type: + type: string + enum: [HttpComponentsResolver] + retriever: + title: Retriever + description: Component used to coordinate how records are extracted across stream slices and request pages. + anyOf: + - "$ref": "#/definitions/AsyncRetriever" + - "$ref": "#/definitions/CustomRetriever" + - "$ref": "#/definitions/SimpleRetriever" + components_mapping: + type: array + items: + - "$ref": "#/definitions/ComponentMappingDefinition" + $parameters: + type: object + additionalProperties: true + required: + - type + - retriever + - components_mapping + DynamicDeclarativeStream: + type: object + properties: + stream_template: + title: Stream Template + description: Reference to the stream template. + "$ref": "#/definitions/DeclarativeStream" + components_resolver: + anyOf: + - "$ref": "#/definitions/HttpComponentsResolver" + - "$ref": "#/definitions/ConfigDrivenComponentsParser" interpolation: variables: - title: config diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 174334d9c..bd8f86eab 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1031,6 +1031,44 @@ class WaitUntilTimeFromHeader(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class ComponentMappingDefinition(BaseModel): + type: Literal["ComponentMappingDefinition"] + key: str = Field( + ..., + description="The target key in the stream template where the value will be added or updated.", + title="Key", + ) + value: str = Field( + ..., + description="The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime.", + examples=[ + "{{ components_values['updates'] }}", + "{{ components_values['MetaData']['LastUpdatedTime'] }}", + "{{ config['segment_id'] }}", + ], + title="Value", + ) + value_type: Optional[ValueType] = Field( + None, + description="The expected data type of the value. If omitted, the type will be inferred from the value provided.", + title="Value Type", + ) + condition: Optional[str] = Field( + "", + description="An optional condition that must evaluate to `true` for the mapping to be applied. This can use interpolation for dynamic evaluation.", + examples=[ + "{{ components_values['created_at'] >= stream_interval['start_time'] }}", + "{{ components_values.status in ['active', 'expired'] }}", + ], + title="Condition", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + +class ConfigDrivenComponentsParser(BaseModel): + __root__: Any + + class AddedFieldDefinition(BaseModel): type: Literal["AddedFieldDefinition"] path: List[str] = Field( @@ -1739,6 +1777,26 @@ class SubstreamPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class HttpComponentsResolver(BaseModel): + type: Literal["HttpComponentsResolver"] + retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field( + ..., + description="Component used to coordinate how records are extracted across stream slices and request pages.", + title="Retriever", + ) + components_mapping: List[ComponentMappingDefinition] + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + +class DynamicDeclarativeStream(BaseModel): + stream_template: Optional[DeclarativeStream] = Field( + None, description="Reference to the stream template.", title="Stream Template" + ) + components_resolver: Optional[Union[HttpComponentsResolver, ConfigDrivenComponentsParser]] = ( + None + ) + + CompositeErrorHandler.update_forward_refs() DeclarativeSource.update_forward_refs() SelectiveAuthenticator.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index d2dd9d9dc..c81d901d2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -30,6 +30,10 @@ from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository +from airbyte_cdk.sources.declarative.resolvers import ( + HttpComponentsResolver, + ComponentMappingDefinition, +) from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator from airbyte_cdk.sources.declarative.auth.declarative_authenticator import ( @@ -277,6 +281,12 @@ SimpleRetriever as SimpleRetrieverModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpComponentsResolver as HttpComponentsResolverModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ComponentMappingDefinition as ComponentMappingDefinitionModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( SubstreamPartitionRouter as SubstreamPartitionRouterModel, ) @@ -291,6 +301,7 @@ XmlDecoder as XmlDecoderModel, ) from airbyte_cdk.sources.declarative.partition_routers import ( + PartitionRouter, CartesianProductStreamSlicer, ListPartitionRouter, SinglePartitionRouter, @@ -461,6 +472,8 @@ def _init_mappings(self) -> None: WaitTimeFromHeaderModel: self.create_wait_time_from_header, WaitUntilTimeFromHeaderModel: self.create_wait_until_time_from_header, AsyncRetrieverModel: self.create_async_retriever, + HttpComponentsResolverModel: self.create_http_components_resolver, + ComponentMappingDefinitionModel: self.create_components_mapping_definition, } # Needed for the case where we need to perform a second parse on the fields of a custom component @@ -1279,19 +1292,20 @@ def create_declarative_stream( parameters=model.parameters or {}, ) - def _merge_stream_slicers( - self, model: DeclarativeStreamModel, config: Config - ) -> Optional[StreamSlicer]: - stream_slicer = None + def _build_stream_slicer_from_partition_router( + self, + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + config: Config, + ) -> Optional[PartitionRouter]: if ( - hasattr(model.retriever, "partition_router") - and isinstance(model.retriever, SimpleRetrieverModel) - and model.retriever.partition_router + hasattr(model, "partition_router") + and isinstance(model, SimpleRetrieverModel) + and model.partition_router ): - stream_slicer_model = model.retriever.partition_router + stream_slicer_model = model.partition_router if isinstance(stream_slicer_model, list): - stream_slicer = CartesianProductStreamSlicer( + return CartesianProductStreamSlicer( [ self._create_component_from_model(model=slicer, config=config) for slicer in stream_slicer_model @@ -1299,9 +1313,24 @@ def _merge_stream_slicers( parameters={}, ) else: - stream_slicer = self._create_component_from_model( - model=stream_slicer_model, config=config - ) + return self._create_component_from_model(model=stream_slicer_model, config=config) # type: ignore[no-any-return] + # Will be created PartitionRouter as stream_slicer_model is model.partition_router + return None + + def _build_resumable_cursor_from_paginator( + self, + model: Union[AsyncRetrieverModel, CustomRetrieverModel, SimpleRetrieverModel], + stream_slicer: Optional[StreamSlicer], + ) -> Optional[StreamSlicer]: + if hasattr(model, "paginator") and model.paginator and not stream_slicer: + # For the regular Full-Refresh streams, we use the high level `ResumableFullRefreshCursor` + return ResumableFullRefreshCursor(parameters={}) + return None + + def _merge_stream_slicers( + self, model: DeclarativeStreamModel, config: Config + ) -> Optional[StreamSlicer]: + stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) if model.incremental_sync and stream_slicer: incremental_sync_model = model.incremental_sync @@ -1342,15 +1371,7 @@ def _merge_stream_slicers( ), partition_router=stream_slicer, ) - elif ( - hasattr(model.retriever, "paginator") - and model.retriever.paginator - and not stream_slicer - ): - # For the regular Full-Refresh streams, we use the high level `ResumableFullRefreshCursor` - return ResumableFullRefreshCursor(parameters={}) - else: - return None + return self._build_resumable_cursor_from_paginator(model.retriever, stream_slicer) def create_default_error_handler( self, model: DefaultErrorHandlerModel, config: Config, **kwargs: Any @@ -2182,3 +2203,53 @@ def get_message_repository(self) -> MessageRepository: def _evaluate_log_level(self, emit_connector_builder_messages: bool) -> Level: return Level.DEBUG if emit_connector_builder_messages else Level.INFO + + @staticmethod + def create_components_mapping_definition( + model: ComponentMappingDefinitionModel, config: Config, **kwargs: Any + ) -> ComponentMappingDefinition: + interpolated_value = InterpolatedString.create( + model.value, parameters=model.parameters or {} + ) + return ComponentMappingDefinition( + key=model.key, + value=interpolated_value, + value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type), + condition=model.condition or "", + parameters=model.parameters or {}, + ) + + def create_http_components_resolver( + self, model: HttpComponentsResolverModel, config: Config + ) -> Any: + stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config) + combined_slicers = self._build_resumable_cursor_from_paginator( + model.retriever, stream_slicer + ) + + retriever = self._create_component_from_model( + model=model.retriever, + config=config, + name="", + primary_key=None, + stream_slicer=combined_slicers, + transformations=[], + ) + + components_mapping = [ + self._create_component_from_model( + model=components_mapping_definition_model, + value_type=ModelToComponentFactory._json_schema_type_name_to_type( + components_mapping_definition_model.value_type + ), + config=config, + ) + for components_mapping_definition_model in model.components_mapping + ] + + return HttpComponentsResolver( + retriever=retriever, + config=config, + components_mapping=components_mapping, + parameters=model.parameters or {}, + ) diff --git a/airbyte_cdk/sources/declarative/partition_routers/__init__.py b/airbyte_cdk/sources/declarative/partition_routers/__init__.py index 86e472a42..9487f5e1d 100644 --- a/airbyte_cdk/sources/declarative/partition_routers/__init__.py +++ b/airbyte_cdk/sources/declarative/partition_routers/__init__.py @@ -6,5 +6,6 @@ from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ListPartitionRouter from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import SinglePartitionRouter from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import SubstreamPartitionRouter +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter -__all__ = ["CartesianProductStreamSlicer", "ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter"] +__all__ = ["CartesianProductStreamSlicer", "ListPartitionRouter", "SinglePartitionRouter", "SubstreamPartitionRouter", "PartitionRouter"] diff --git a/airbyte_cdk/sources/declarative/resolvers/__init__.py b/airbyte_cdk/sources/declarative/resolvers/__init__.py new file mode 100644 index 000000000..8dcbaf2e2 --- /dev/null +++ b/airbyte_cdk/sources/declarative/resolvers/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.resolvers.components_resolver import ComponentsResolver, ComponentMappingDefinition, ResolvedComponentMappingDefinition +from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import HttpComponentsResolver + +__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition"] diff --git a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py new file mode 100644 index 000000000..69303cc02 --- /dev/null +++ b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py @@ -0,0 +1,49 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from abc import abstractmethod +from dataclasses import InitVar, dataclass +from typing import Any, Dict, Mapping, Optional, Type, Union, Iterable + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString + + +@dataclass(frozen=True) +class ComponentMappingDefinition: + """Defines the key-value mapping configuration for a stream component.""" + + key: str + value: Union["InterpolatedString", str] + value_type: Optional[Type[Any]] + parameters: InitVar[Mapping[str, Any]] + condition: str = "" + + +@dataclass(frozen=True) +class ResolvedComponentMappingDefinition: + """Represents a parsed and resolved component mapping for a stream configuration.""" + + key: str + value: Union["InterpolatedString", str] + value_type: Optional[Type[Any]] + parameters: InitVar[Mapping[str, Any]] + condition: Optional[Union["InterpolatedBoolean", str]] = "" + + +@dataclass +class ComponentsResolver: + """ + Abstract base class for resolving components in a stream template. + """ + + @abstractmethod + def resolve_components( + self, stream_template_config: Dict[str, Any] + ) -> Iterable[Dict[str, Any]]: + """ + Maps and populates values into a stream template configuration. + :param stream_template_config: The stream template with placeholders for components. + :yields: The resolved stream config with populated values. + """ + pass diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py new file mode 100644 index 000000000..5abe8dd4e --- /dev/null +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -0,0 +1,140 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from copy import deepcopy +from dataclasses import InitVar, dataclass, field +from typing import Any, Dict, List, Mapping, Optional, Iterable, Union + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString +from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.types import Config +from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( + ComponentsResolver, + ComponentMappingDefinition, + ResolvedComponentMappingDefinition, +) + + +@dataclass +class HttpComponentsResolver(ComponentsResolver): + """ + Resolves and populates stream templates with components fetched via an HTTP retriever. + + Attributes: + retriever (Retriever): The retriever used to fetch data from an API. + config (Config): Configuration object for the resolver. + components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. + parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. + """ + + retriever: Retriever + config: Config + components_mapping: List[ComponentMappingDefinition] + parameters: InitVar[Mapping[str, Any]] + _resolved_components: List[ResolvedComponentMappingDefinition] = field( + init=False, repr=False, default_factory=list + ) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + """ + Initializes and parses component mappings, converting them to resolved definitions. + + Args: + parameters (Mapping[str, Any]): Parameters for interpolation. + """ + for component_mapping in self.components_mapping: + condition = component_mapping.condition or True + + if isinstance(component_mapping.value, (str, InterpolatedString)): + interpolated_value = ( + InterpolatedString.create(component_mapping.value, parameters=parameters) + if isinstance(component_mapping.value, str) + else component_mapping.value + ) + self._resolved_components.append( + ResolvedComponentMappingDefinition( + key=component_mapping.key, + value=interpolated_value, + value_type=component_mapping.value_type, + condition=InterpolatedBoolean(condition=condition, parameters=parameters), + parameters=parameters, + ) + ) + else: + raise ValueError( + f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" + ) + + def _update_config( + self, + component_config: Dict[str, Any], + key: str, + value: Any, + condition: Optional[Union[InterpolatedBoolean, str]], + **kwargs, + ) -> Dict[str, Any]: + """ + Recursively updates the configuration dictionary for a specific key. + + Args: + component_config (Dict[str, Any]): Component config to update. + key (str): Target key to update. + value (Any): Value to assign to the target key. + condition (Optional[InterpolatedBoolean]): Condition for applying the update. + + Returns: + Dict[str, Any]: Updated configuration dictionary. + """ + kwargs["current_component_config"] = component_config + should_update = condition.eval(self.config, **kwargs) if condition else True + + for key, value in component_config.items(): + if key == key and should_update: + component_config[key] = value + elif isinstance(value, dict): + component_config[key] = self._update_config(value, key, value, condition, **kwargs) + elif isinstance(value, list): + component_config[key] = [ + self._update_config(item, key, value, condition, **kwargs) + if isinstance(item, dict) + else item + for item in value + ] + + return component_config + + def resolve_components( + self, stream_template_config: Dict[str, Any] + ) -> Iterable[Dict[str, Any]]: + """ + Resolves components in the stream template configuration by populating values. + + Args: + stream_template_config (Dict[str, Any]): Stream template to populate. + + Yields: + Dict[str, Any]: Updated configurations with resolved components. + """ + kwargs = {"stream_template_config": stream_template_config} + + for components_values in self.retriever.read_records({}): + updated_config = deepcopy(stream_template_config) + kwargs["components_values"] = components_values + + for resolved_component in self._resolved_components: + valid_types = ( + (resolved_component.value_type,) if resolved_component.value_type else None + ) + value = resolved_component.value.eval( + self.config, valid_types=valid_types, **kwargs + ) + updated_config = self._update_config( + updated_config, + key=resolved_component.key, + value=value, + condition=resolved_component.condition, + **kwargs, + ) + + yield updated_config From c12f3511b532c3b2742f1179c06ee2273669eb09 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 26 Nov 2024 04:22:06 +0100 Subject: [PATCH 02/19] Fix mypy --- .../declarative/resolvers/components_resolver.py | 2 +- .../declarative/resolvers/http_components_resolver.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py index 69303cc02..002d4767b 100644 --- a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py @@ -25,7 +25,7 @@ class ResolvedComponentMappingDefinition: """Represents a parsed and resolved component mapping for a stream configuration.""" key: str - value: Union["InterpolatedString", str] + value: "InterpolatedString" value_type: Optional[Type[Any]] parameters: InitVar[Mapping[str, Any]] condition: Optional[Union["InterpolatedBoolean", str]] = "" diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index 5abe8dd4e..5a2b71b77 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -44,7 +44,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: parameters (Mapping[str, Any]): Parameters for interpolation. """ for component_mapping in self.components_mapping: - condition = component_mapping.condition or True + condition = component_mapping.condition or "True" if isinstance(component_mapping.value, (str, InterpolatedString)): interpolated_value = ( @@ -71,8 +71,8 @@ def _update_config( component_config: Dict[str, Any], key: str, value: Any, - condition: Optional[Union[InterpolatedBoolean, str]], - **kwargs, + condition: Optional[InterpolatedBoolean], + **kwargs: Any, ) -> Dict[str, Any]: """ Recursively updates the configuration dictionary for a specific key. @@ -120,7 +120,7 @@ def resolve_components( for components_values in self.retriever.read_records({}): updated_config = deepcopy(stream_template_config) - kwargs["components_values"] = components_values + kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] for resolved_component in self._resolved_components: valid_types = ( @@ -133,7 +133,7 @@ def resolve_components( updated_config, key=resolved_component.key, value=value, - condition=resolved_component.condition, + condition=resolved_component.condition, # type: ignore[arg-type] # The condition in resolved_component always has the type InterpolatedBoolean if it exists. **kwargs, ) From e2505b151ac3b836372fd894f500f6d73274a1ba Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 26 Nov 2024 04:26:55 +0100 Subject: [PATCH 03/19] Fix formatting --- .../sources/declarative/resolvers/http_components_resolver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index 5a2b71b77..9395c3e2a 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -4,7 +4,7 @@ from copy import deepcopy from dataclasses import InitVar, dataclass, field -from typing import Any, Dict, List, Mapping, Optional, Iterable, Union +from typing import Any, Dict, List, Mapping, Optional, Iterable from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever From c8e350972db564a89f9cfa043f498a438450defd Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Wed, 27 Nov 2024 00:34:48 +0100 Subject: [PATCH 04/19] Added dynamic stream component --- .../concurrent_declarative_source.py | 3 +- .../declarative_component_schema.yaml | 11 ++- .../manifest_declarative_source.py | 54 +++++++++++- .../models/declarative_component_schema.py | 85 +++++++++++-------- .../sources/declarative/resolvers/__init__.py | 7 +- .../resolvers/components_resolver.py | 4 +- .../resolvers/http_components_resolver.py | 3 + 7 files changed, 126 insertions(+), 41 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 001740a35..02b7865d6 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -181,7 +181,8 @@ def _group_streams( state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later name_to_stream_mapping = { - stream["name"]: stream for stream in self.resolved_manifest["streams"] + stream["name"]: stream + for stream in self._stream_configs(self.resolved_manifest, config) } for declarative_stream in self.streams(config=config): diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index cfba4f3ae..7786cde4f 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -19,6 +19,10 @@ properties: type: array items: "$ref": "#/definitions/DeclarativeStream" + dynamic_streams: + type: array + items: + "$ref": "#/definitions/DynamicDeclarativeStream" version: type: string description: The version of the Airbyte CDK used to build and test the source. @@ -2702,7 +2706,7 @@ definitions: additionalProperties: true ComponentMappingDefinition: title: Component Mapping Definition - description: Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts. + description: Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts. (This component is experimental. Use at your own risk.) type: object required: - type @@ -2749,6 +2753,7 @@ definitions: additionalProperties: true HttpComponentsResolver: type: object + description: Component resolve and populates stream templates with components fetched via an HTTP retriever. (This component is experimental. Use at your own risk.) properties: type: type: string @@ -2773,15 +2778,17 @@ definitions: - components_mapping DynamicDeclarativeStream: type: object + description: A component that described how will be created declarative streams based on stream template. (This component is experimental. Use at your own risk.) properties: stream_template: title: Stream Template description: Reference to the stream template. "$ref": "#/definitions/DeclarativeStream" components_resolver: + title: Components Resolver + description: Component resolve and populates stream templates with components values. anyOf: - "$ref": "#/definitions/HttpComponentsResolver" - - "$ref": "#/definitions/ConfigDrivenComponentsParser" interpolation: variables: - title: config diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 223cbc0b6..c75f80ac2 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -30,6 +30,8 @@ from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( ManifestComponentTransformer, ) +from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING + from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( ManifestReferenceResolver, ) @@ -119,7 +121,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._emit_manifest_debug_message( extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} ) - stream_configs = self._stream_configs(self._source_config) + stream_configs = self._stream_configs(self._source_config, config) source_streams = [ self._constructor.create_component( @@ -294,13 +296,61 @@ def _parse_version( # No exception return parsed_version - def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: + def _stream_configs( + self, manifest: Mapping[str, Any], config: Mapping[str, Any] + ) -> List[Dict[str, Any]]: # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) + + # Add dynamic stream configs to the common stream configs + stream_configs.extend(self._dynamic_stream_configs(manifest, config)) + for s in stream_configs: if "type" not in s: s["type"] = "DeclarativeStream" return stream_configs + def _dynamic_stream_configs( + self, manifest: Mapping[str, Any], config: Mapping[str, Any] + ) -> List[Dict[str, Any]]: + dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) + dynamic_stream_configs: List[Dict[str, Any]] = [] + + for dynamic_definition in dynamic_stream_definitions: + components_resolver_config = dynamic_definition["components_resolver"] + + if not components_resolver_config: + raise ValueError( + f"Missing 'components_resolver' in dynamic definition: {dynamic_definition}" + ) + + resolver_type = components_resolver_config.get("type") + if not resolver_type: + raise ValueError( + f"Missing 'type' in components resolver configuration: {components_resolver_config}" + ) + + if resolver_type not in COMPONENTS_RESOLVER_TYPE_MAPPING: + raise ValueError( + f"Invalid components resolver type '{resolver_type}'. " + f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." + ) + + # Create a resolver for dynamic components based on type + components_resolver = self._constructor.create_component( + COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config + ) + + stream_template_config = dynamic_definition["stream_template"] + dynamic_stream_configs.extend( + list( + components_resolver.resolve_components( + stream_template_config=stream_template_config + ) + ) + ) + + return dynamic_stream_configs + def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: self.logger.debug("declarative source created from manifest", extra=extra_args) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index bd8f86eab..80af4a7d3 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,7 +528,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -752,19 +754,21 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -782,7 +786,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1065,10 +1071,6 @@ class ComponentMappingDefinition(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class ConfigDrivenComponentsParser(BaseModel): - __root__: Any - - class AddedFieldDefinition(BaseModel): type: Literal["AddedFieldDefinition"] path: List[str] = Field( @@ -1373,6 +1375,7 @@ class Config: type: Literal["DeclarativeSource"] check: CheckStream streams: List[DeclarativeStream] + dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None version: str = Field( ..., description="The version of the Airbyte CDK used to build and test the source.", @@ -1442,21 +1445,25 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( - Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", - ) + schema_loader: Optional[ + Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] + ] = Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1674,7 +1681,11 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -1743,7 +1754,11 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -1792,8 +1807,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: Optional[DeclarativeStream] = Field( None, description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Optional[Union[HttpComponentsResolver, ConfigDrivenComponentsParser]] = ( - None + components_resolver: Optional[HttpComponentsResolver] = Field( + None, + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) diff --git a/airbyte_cdk/sources/declarative/resolvers/__init__.py b/airbyte_cdk/sources/declarative/resolvers/__init__.py index 8dcbaf2e2..17a3b5d52 100644 --- a/airbyte_cdk/sources/declarative/resolvers/__init__.py +++ b/airbyte_cdk/sources/declarative/resolvers/__init__.py @@ -4,5 +4,10 @@ from airbyte_cdk.sources.declarative.resolvers.components_resolver import ComponentsResolver, ComponentMappingDefinition, ResolvedComponentMappingDefinition from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import HttpComponentsResolver +from airbyte_cdk.sources.declarative.models import HttpComponentsResolver as HttpComponentsResolverModel -__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition"] +COMPONENTS_RESOLVER_TYPE_MAPPING = { + "HttpComponentsResolver": HttpComponentsResolverModel +} + +__all__ = ["ComponentsResolver", "HttpComponentsResolver", "ComponentMappingDefinition", "ResolvedComponentMappingDefinition", "COMPONENTS_RESOLVER_TYPE_MAPPING"] diff --git a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py index 002d4767b..827a28097 100644 --- a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py @@ -5,8 +5,9 @@ from abc import abstractmethod from dataclasses import InitVar, dataclass from typing import Any, Dict, Mapping, Optional, Type, Union, Iterable - +from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString +from deprecated.classic import deprecated @dataclass(frozen=True) @@ -31,6 +32,7 @@ class ResolvedComponentMappingDefinition: condition: Optional[Union["InterpolatedBoolean", str]] = "" +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @dataclass class ComponentsResolver: """ diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index 9395c3e2a..1b6220359 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -8,14 +8,17 @@ from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( ComponentsResolver, ComponentMappingDefinition, ResolvedComponentMappingDefinition, ) +from deprecated.classic import deprecated +@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @dataclass class HttpComponentsResolver(ComponentsResolver): """ From 6e3ececf493de736846750a21fbada961f1ecf0d Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Wed, 27 Nov 2024 00:43:56 +0100 Subject: [PATCH 05/19] Fix model --- .../sources/declarative/declarative_component_schema.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 7786cde4f..74901fe53 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2787,8 +2787,7 @@ definitions: components_resolver: title: Components Resolver description: Component resolve and populates stream templates with components values. - anyOf: - - "$ref": "#/definitions/HttpComponentsResolver" + "$ref": "#/definitions/HttpComponentsResolver" interpolation: variables: - title: config From b1365244b00c9d85e8439c47dcefcd068ec23681 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Wed, 27 Nov 2024 01:30:21 +0100 Subject: [PATCH 06/19] Add unit tests --- .../models/declarative_component_schema.py | 74 +++++++----------- .../resolvers/http_components_resolver.py | 12 +-- .../sources/declarative/resolvers/__init__.py | 3 + .../test_http_components_resolver.py | 75 +++++++++++++++++++ 4 files changed, 112 insertions(+), 52 deletions(-) create mode 100644 unit_tests/sources/declarative/resolvers/__init__.py create mode 100644 unit_tests/sources/declarative/resolvers/test_http_components_resolver.py diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 80af4a7d3..7f15410de 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,9 +528,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -754,21 +752,19 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -786,9 +782,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1445,25 +1439,21 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[ - Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] - ] = Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", + schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( + Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", + ) ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1681,11 +1671,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -1754,11 +1740,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index 1b6220359..900a09d39 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -72,8 +72,8 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def _update_config( self, component_config: Dict[str, Any], - key: str, - value: Any, + target_key: str, + target_value: Any, condition: Optional[InterpolatedBoolean], **kwargs: Any, ) -> Dict[str, Any]: @@ -93,8 +93,8 @@ def _update_config( should_update = condition.eval(self.config, **kwargs) if condition else True for key, value in component_config.items(): - if key == key and should_update: - component_config[key] = value + if key == target_key and should_update: + component_config[key] = target_value elif isinstance(value, dict): component_config[key] = self._update_config(value, key, value, condition, **kwargs) elif isinstance(value, list): @@ -134,8 +134,8 @@ def resolve_components( ) updated_config = self._update_config( updated_config, - key=resolved_component.key, - value=value, + target_key=resolved_component.key, + target_value=value, condition=resolved_component.condition, # type: ignore[arg-type] # The condition in resolved_component always has the type InterpolatedBoolean if it exists. **kwargs, ) diff --git a/unit_tests/sources/declarative/resolvers/__init__.py b/unit_tests/sources/declarative/resolvers/__init__.py new file mode 100644 index 000000000..66f6de8cb --- /dev/null +++ b/unit_tests/sources/declarative/resolvers/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py new file mode 100644 index 000000000..7591d36b9 --- /dev/null +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -0,0 +1,75 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import pytest +from unittest.mock import MagicMock +from airbyte_cdk.sources.declarative.resolvers import ( + ComponentMappingDefinition, + HttpComponentsResolver, +) + + +@pytest.mark.parametrize( + "components_mapping, retriever_data, stream_template_config, expected_result", + [ + ( + [ + ComponentMappingDefinition( + key="key1", + value="{{components_values['key1']}}", + value_type=str, + condition="True", + parameters={}, + ), + ComponentMappingDefinition( + key="key2", + value="{{components_values['key2']}}", + value_type=str, + condition="False", + parameters={}, + ), + ], + [{"key1": "updated_value1", "key2": "updated_value2"}], + {"key1": None, "key2": None}, + [{"key1": "updated_value1", "key2": None}], # Only key1 is updated + ), + ( + [ + ComponentMappingDefinition( + key="key3", + value="{{components_values['key3']}}", + value_type=str, + condition="True", + parameters={}, + ), + ], + [{"key3": "updated_value3"}], + {"key3": None}, + [{"key3": "updated_value3"}], # key3 is updated + ), + ], +) +def test_http_components_resolver( + components_mapping, retriever_data, stream_template_config, expected_result +): + # Mock the retriever to simulate reading records + mock_retriever = MagicMock() + mock_retriever.read_records.return_value = retriever_data + + # Use a simple dictionary for the config, as Config should be a Mapping + config = {} + + # Instantiate the resolver with mocked data + resolver = HttpComponentsResolver( + retriever=mock_retriever, + config=config, + components_mapping=components_mapping, + parameters={}, + ) + + # Run the resolve_components method and convert the result to a list + result = list(resolver.resolve_components(stream_template_config=stream_template_config)) + + # Assert the resolved components match the expected result + assert result == expected_result From 1d83663109f47754a6074a4ffcc880870f83bde7 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 29 Nov 2024 01:53:13 +0100 Subject: [PATCH 07/19] Replace key with field_path and update according to review --- .../concurrent_declarative_source.py | 11 ++-- .../declarative_component_schema.yaml | 41 ++++++------ .../manifest_declarative_source.py | 31 +++++---- .../models/declarative_component_schema.py | 63 ++++++++++++++---- .../parsers/model_to_component_factory.py | 7 +- .../resolvers/components_resolver.py | 14 ++-- .../resolvers/http_components_resolver.py | 64 ++++--------------- .../test_http_components_resolver.py | 26 +------- 8 files changed, 119 insertions(+), 138 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 02b7865d6..0181cb34a 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -180,10 +180,13 @@ def _group_streams( state_manager = ConnectorStateManager(state=self._state) # type: ignore # state is always in the form of List[AirbyteStateMessage]. The ConnectorStateManager should use generics, but this can be done later - name_to_stream_mapping = { - stream["name"]: stream - for stream in self._stream_configs(self.resolved_manifest, config) - } + # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, + # and this is validated during the initialization of the source. + streams = self.resolved_manifest.get("streams", []) + self.resolved_manifest.get( + "dynamic_streams", [] + ) + + name_to_stream_mapping = {stream["name"]: stream for stream in streams} for declarative_stream in self.streams(config=config): # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 74901fe53..0c31895e2 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -7,8 +7,12 @@ version: 1.0.0 required: - type - check - - streams - version +anyOf: + - required: + - streams + - required: + - dynamic_streams properties: type: type: string @@ -2706,20 +2710,31 @@ definitions: additionalProperties: true ComponentMappingDefinition: title: Component Mapping Definition - description: Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts. (This component is experimental. Use at your own risk.) + description: (This component is experimental. Use at your own risk.) Specifies a mapping definition to update or add fields in a record or configuration. This allows dynamic mapping of data by interpolating values into the template based on provided contexts. type: object required: - type - - key + - field_path - value properties: type: type: string enum: [ComponentMappingDefinition] - key: - title: Key - description: The target key in the stream template where the value will be added or updated. - type: string + field_path: + title: Field Path + description: A list of potentially nested fields indicating the full path where value will be added or updated. + type: array + items: + - type: string + interpolation_content: + - config + - components_values + - stream_template_config + examples: + - ["data"] + - ["data", "records"] + - ["data", "{{ parameters.name }}"] + - ["data", "*", "record"] value: title: Value description: The dynamic or static value to assign to the key. Interpolated values can be used to dynamically determine the value during runtime. @@ -2736,18 +2751,6 @@ definitions: title: Value Type description: The expected data type of the value. If omitted, the type will be inferred from the value provided. "$ref": "#/definitions/ValueType" - condition: - title: Condition - description: An optional condition that must evaluate to `true` for the mapping to be applied. This can use interpolation for dynamic evaluation. - type: string - default: "" - interpolation_context: - - config - - stream_template_config - - components_values - examples: - - "{{ components_values['created_at'] >= stream_interval['start_time'] }}" - - "{{ components_values.status in ['active', 'expired'] }}" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index c75f80ac2..ebf149e56 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -121,7 +121,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: self._emit_manifest_debug_message( extra_args={"source_name": self.name, "parsed_config": json.dumps(self._source_config)} ) - stream_configs = self._stream_configs(self._source_config, config) + + stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( + self._source_config, config + ) source_streams = [ self._constructor.create_component( @@ -235,7 +238,8 @@ def _validate_source(self) -> None: ) streams = self._source_config.get("streams") - if not streams: + dynamic_streams = self._source_config.get("dynamic_streams") + if not (streams or dynamic_streams): raise ValidationError( f"A valid manifest should have at least one stream defined. Got {streams}" ) @@ -296,15 +300,9 @@ def _parse_version( # No exception return parsed_version - def _stream_configs( - self, manifest: Mapping[str, Any], config: Mapping[str, Any] - ) -> List[Dict[str, Any]]: + def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config stream_configs: List[Dict[str, Any]] = manifest.get("streams", []) - - # Add dynamic stream configs to the common stream configs - stream_configs.extend(self._dynamic_stream_configs(manifest, config)) - for s in stream_configs: if "type" not in s: s["type"] = "DeclarativeStream" @@ -342,13 +340,14 @@ def _dynamic_stream_configs( ) stream_template_config = dynamic_definition["stream_template"] - dynamic_stream_configs.extend( - list( - components_resolver.resolve_components( - stream_template_config=stream_template_config - ) - ) - ) + + for dynamic_stream in components_resolver.resolve_components( + stream_template_config=stream_template_config + ): + if "type" not in dynamic_stream: + dynamic_stream["type"] = "DeclarativeStream" + + dynamic_stream_configs.append(dynamic_stream) return dynamic_stream_configs diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 7f15410de..83f8aecae 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1033,10 +1033,16 @@ class WaitUntilTimeFromHeader(BaseModel): class ComponentMappingDefinition(BaseModel): type: Literal["ComponentMappingDefinition"] - key: str = Field( + field_path: List[str] = Field( ..., - description="The target key in the stream template where the value will be added or updated.", - title="Key", + description="A list of potentially nested fields indicating the full path where value will be added or updated.", + examples=[ + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], + ], + title="Field Path", ) value: str = Field( ..., @@ -1053,15 +1059,6 @@ class ComponentMappingDefinition(BaseModel): description="The expected data type of the value. If omitted, the type will be inferred from the value provided.", title="Value Type", ) - condition: Optional[str] = Field( - "", - description="An optional condition that must evaluate to `true` for the mapping to be applied. This can use interpolation for dynamic evaluation.", - examples=[ - "{{ components_values['created_at'] >= stream_interval['start_time'] }}", - "{{ components_values.status in ['active', 'expired'] }}", - ], - title="Condition", - ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1362,7 +1359,7 @@ class CompositeErrorHandler(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class DeclarativeSource(BaseModel): +class DeclarativeSource1(BaseModel): class Config: extra = Extra.forbid @@ -1388,6 +1385,43 @@ class Config: ) +class DeclarativeSource2(BaseModel): + class Config: + extra = Extra.forbid + + type: Literal["DeclarativeSource"] + check: CheckStream + streams: Optional[List[DeclarativeStream]] = None + dynamic_streams: List[DynamicDeclarativeStream] + version: str = Field( + ..., + description="The version of the Airbyte CDK used to build and test the source.", + ) + schemas: Optional[Schemas] = None + definitions: Optional[Dict[str, Any]] = None + spec: Optional[Spec] = None + concurrency_level: Optional[ConcurrencyLevel] = None + metadata: Optional[Dict[str, Any]] = Field( + None, + description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.", + ) + description: Optional[str] = Field( + None, + description="A description of the connector. It will be presented on the Source documentation page.", + ) + + +class DeclarativeSource(BaseModel): + class Config: + extra = Extra.forbid + + __root__: Union[DeclarativeSource1, DeclarativeSource2] = Field( + ..., + description="An API source that extracts data according to its declarative components.", + title="DeclarativeSource", + ) + + class SelectiveAuthenticator(BaseModel): class Config: extra = Extra.allow @@ -1797,7 +1831,8 @@ class DynamicDeclarativeStream(BaseModel): CompositeErrorHandler.update_forward_refs() -DeclarativeSource.update_forward_refs() +DeclarativeSource1.update_forward_refs() +DeclarativeSource2.update_forward_refs() SelectiveAuthenticator.update_forward_refs() DeclarativeStream.update_forward_refs() SessionTokenAuthenticator.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c81d901d2..58df95c10 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2211,11 +2211,14 @@ def create_components_mapping_definition( interpolated_value = InterpolatedString.create( model.value, parameters=model.parameters or {} ) + field_path = [ + InterpolatedString.create(path, parameters=model.parameters or {}) + for path in model.field_path + ] return ComponentMappingDefinition( - key=model.key, + field_path=field_path, value=interpolated_value, value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type), - condition=model.condition or "", parameters=model.parameters or {}, ) diff --git a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py index 827a28097..af1d76345 100644 --- a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py @@ -2,11 +2,11 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. # -from abc import abstractmethod +from abc import ABC, abstractmethod from dataclasses import InitVar, dataclass -from typing import Any, Dict, Mapping, Optional, Type, Union, Iterable +from typing import Any, Dict, Mapping, Optional, Type, Union, Iterable, List from airbyte_cdk.sources.source import ExperimentalClassWarning -from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from deprecated.classic import deprecated @@ -14,27 +14,25 @@ class ComponentMappingDefinition: """Defines the key-value mapping configuration for a stream component.""" - key: str + field_path: List["InterpolatedString"] value: Union["InterpolatedString", str] value_type: Optional[Type[Any]] parameters: InitVar[Mapping[str, Any]] - condition: str = "" @dataclass(frozen=True) class ResolvedComponentMappingDefinition: """Represents a parsed and resolved component mapping for a stream configuration.""" - key: str + field_path: List["InterpolatedString"] value: "InterpolatedString" value_type: Optional[Type[Any]] parameters: InitVar[Mapping[str, Any]] - condition: Optional[Union["InterpolatedBoolean", str]] = "" @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) @dataclass -class ComponentsResolver: +class ComponentsResolver(ABC): """ Abstract base class for resolving components in a stream template. """ diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index 900a09d39..c3686422d 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -4,9 +4,10 @@ from copy import deepcopy from dataclasses import InitVar, dataclass, field -from typing import Any, Dict, List, Mapping, Optional, Iterable +from typing import Any, Dict, List, Mapping, Iterable -from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString +import dpath +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.types import Config @@ -47,20 +48,23 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: parameters (Mapping[str, Any]): Parameters for interpolation. """ for component_mapping in self.components_mapping: - condition = component_mapping.condition or "True" - if isinstance(component_mapping.value, (str, InterpolatedString)): interpolated_value = ( InterpolatedString.create(component_mapping.value, parameters=parameters) if isinstance(component_mapping.value, str) else component_mapping.value ) + + field_path = [ + InterpolatedString.create(path, parameters=parameters) + for path in component_mapping.field_path + ] + self._resolved_components.append( ResolvedComponentMappingDefinition( - key=component_mapping.key, + field_path=field_path, value=interpolated_value, value_type=component_mapping.value_type, - condition=InterpolatedBoolean(condition=condition, parameters=parameters), parameters=parameters, ) ) @@ -69,44 +73,6 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" ) - def _update_config( - self, - component_config: Dict[str, Any], - target_key: str, - target_value: Any, - condition: Optional[InterpolatedBoolean], - **kwargs: Any, - ) -> Dict[str, Any]: - """ - Recursively updates the configuration dictionary for a specific key. - - Args: - component_config (Dict[str, Any]): Component config to update. - key (str): Target key to update. - value (Any): Value to assign to the target key. - condition (Optional[InterpolatedBoolean]): Condition for applying the update. - - Returns: - Dict[str, Any]: Updated configuration dictionary. - """ - kwargs["current_component_config"] = component_config - should_update = condition.eval(self.config, **kwargs) if condition else True - - for key, value in component_config.items(): - if key == target_key and should_update: - component_config[key] = target_value - elif isinstance(value, dict): - component_config[key] = self._update_config(value, key, value, condition, **kwargs) - elif isinstance(value, list): - component_config[key] = [ - self._update_config(item, key, value, condition, **kwargs) - if isinstance(item, dict) - else item - for item in value - ] - - return component_config - def resolve_components( self, stream_template_config: Dict[str, Any] ) -> Iterable[Dict[str, Any]]: @@ -132,12 +98,8 @@ def resolve_components( value = resolved_component.value.eval( self.config, valid_types=valid_types, **kwargs ) - updated_config = self._update_config( - updated_config, - target_key=resolved_component.key, - target_value=value, - condition=resolved_component.condition, # type: ignore[arg-type] # The condition in resolved_component always has the type InterpolatedBoolean if it exists. - **kwargs, - ) + + path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] + dpath.set(updated_config, path, value) yield updated_config diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 7591d36b9..13ad03442 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -16,38 +16,16 @@ ( [ ComponentMappingDefinition( - key="key1", + field_path=["key1"], value="{{components_values['key1']}}", value_type=str, - condition="True", - parameters={}, - ), - ComponentMappingDefinition( - key="key2", - value="{{components_values['key2']}}", - value_type=str, - condition="False", parameters={}, ), ], [{"key1": "updated_value1", "key2": "updated_value2"}], {"key1": None, "key2": None}, [{"key1": "updated_value1", "key2": None}], # Only key1 is updated - ), - ( - [ - ComponentMappingDefinition( - key="key3", - value="{{components_values['key3']}}", - value_type=str, - condition="True", - parameters={}, - ), - ], - [{"key3": "updated_value3"}], - {"key3": None}, - [{"key3": "updated_value3"}], # key3 is updated - ), + ) ], ) def test_http_components_resolver( From c30b43fe42e7e01d8396a4f141b7b023d2d933cb Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 29 Nov 2024 02:04:20 +0100 Subject: [PATCH 08/19] Update source schema --- .../declarative_component_schema.yaml | 5 +- .../models/declarative_component_schema.py | 82 +++++++++++-------- 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 0c31895e2..77942a0f4 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2771,7 +2771,7 @@ definitions: components_mapping: type: array items: - - "$ref": "#/definitions/ComponentMappingDefinition" + "$ref": "#/definitions/ComponentMappingDefinition" $parameters: type: object additionalProperties: true @@ -2791,6 +2791,9 @@ definitions: title: Components Resolver description: Component resolve and populates stream templates with components values. "$ref": "#/definitions/HttpComponentsResolver" + required: + - stream_template + - components_resolver interpolation: variables: - title: config diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 83f8aecae..6da5ac83f 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,7 +528,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -752,19 +754,21 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -782,7 +786,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1473,21 +1479,25 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( - Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", - ) + schema_loader: Optional[ + Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] + ] = Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1705,7 +1715,11 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -1774,7 +1788,11 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -1820,11 +1838,11 @@ class HttpComponentsResolver(BaseModel): class DynamicDeclarativeStream(BaseModel): - stream_template: Optional[DeclarativeStream] = Field( - None, description="Reference to the stream template.", title="Stream Template" + stream_template: DeclarativeStream = Field( + ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Optional[HttpComponentsResolver] = Field( - None, + components_resolver: HttpComponentsResolver = Field( + ..., description="Component resolve and populates stream templates with components values.", title="Components Resolver", ) From d0d7107ac61ce52cc61becb3afde94782dcd93af Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 29 Nov 2024 02:07:16 +0100 Subject: [PATCH 09/19] Fix dynamic declarative stream schema --- .../sources/declarative/declarative_component_schema.yaml | 4 ++++ .../declarative/models/declarative_component_schema.py | 1 + 2 files changed, 5 insertions(+) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 77942a0f4..697d13961 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2783,6 +2783,9 @@ definitions: type: object description: A component that described how will be created declarative streams based on stream template. (This component is experimental. Use at your own risk.) properties: + type: + type: string + enum: [ DynamicDeclarativeStream ] stream_template: title: Stream Template description: Reference to the stream template. @@ -2792,6 +2795,7 @@ definitions: description: Component resolve and populates stream templates with components values. "$ref": "#/definitions/HttpComponentsResolver" required: + - type - stream_template - components_resolver interpolation: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 6da5ac83f..e3adfadcf 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1838,6 +1838,7 @@ class HttpComponentsResolver(BaseModel): class DynamicDeclarativeStream(BaseModel): + type: Literal["DynamicDeclarativeStream"] stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) From f6542ec11488bb9b8860c5097d6244454f24890b Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 29 Nov 2024 18:51:57 +0100 Subject: [PATCH 10/19] Added unittets for dyanimc stream read --- .../declarative_component_schema.yaml | 2 +- .../parsers/model_to_component_factory.py | 2 +- .../test_http_components_resolver.py | 135 ++++++++++++++++-- 3 files changed, 127 insertions(+), 12 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 697d13961..1117625df 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2785,7 +2785,7 @@ definitions: properties: type: type: string - enum: [ DynamicDeclarativeStream ] + enum: [DynamicDeclarativeStream] stream_template: title: Stream Template description: Reference to the stream template. diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 58df95c10..fc3d3550d 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2216,7 +2216,7 @@ def create_components_mapping_definition( for path in model.field_path ] return ComponentMappingDefinition( - field_path=field_path, + field_path=field_path, # type: ignore[arg-type] # field_path can be str and InterpolatedString value=interpolated_value, value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type), parameters=model.parameters or {}, diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 13ad03442..a6f824e2e 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -8,6 +8,99 @@ ComponentMappingDefinition, HttpComponentsResolver, ) +from airbyte_cdk.sources.embedded.catalog import ( + to_configured_catalog, + to_configured_stream, +) +from airbyte_cdk.models import Type +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString + +_MANIFEST = { + "version": "5.0.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "HttpComponentsResolver", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + } + ], +} @pytest.mark.parametrize( @@ -16,29 +109,25 @@ ( [ ComponentMappingDefinition( - field_path=["key1"], + field_path=[InterpolatedString.create("key1", parameters={})], value="{{components_values['key1']}}", value_type=str, parameters={}, - ), + ) ], [{"key1": "updated_value1", "key2": "updated_value2"}], {"key1": None, "key2": None}, - [{"key1": "updated_value1", "key2": None}], # Only key1 is updated + [{"key1": "updated_value1", "key2": None}], ) ], ) def test_http_components_resolver( components_mapping, retriever_data, stream_template_config, expected_result ): - # Mock the retriever to simulate reading records mock_retriever = MagicMock() mock_retriever.read_records.return_value = retriever_data - - # Use a simple dictionary for the config, as Config should be a Mapping config = {} - # Instantiate the resolver with mocked data resolver = HttpComponentsResolver( retriever=mock_retriever, config=config, @@ -46,8 +135,34 @@ def test_http_components_resolver( parameters={}, ) - # Run the resolve_components method and convert the result to a list result = list(resolver.resolve_components(stream_template_config=stream_template_config)) - - # Assert the resolved components match the expected result assert result == expected_result + + +def test_dynamic_streams_read(requests_mock): + expected_stream_names = ["item_1", "item_2"] + requests_mock.get( + "https://api.test.com/items", + json=[{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}], + ) + requests_mock.get("https://api.test.com/items/1", json={"id": "1", "name": "item_1"}) + requests_mock.get("https://api.test.com/items/2", json={"id": "2", "name": "item_2"}) + + source = ManifestDeclarativeSource(source_config=_MANIFEST) + actual_catalog = source.discover(logger=source.logger, config={}) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + records = [ + message.record + for message in source.read(MagicMock(), {}, configured_catalog) + if message.type == Type.RECORD + ] + + assert len(actual_catalog.streams) == 2 + assert [stream.name for stream in actual_catalog.streams] == expected_stream_names + assert len(records) == 2 + assert [record.stream for record in records] == expected_stream_names From 13d0d0f302131fb29245ed9fa64559a68327e096 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 29 Nov 2024 18:57:39 +0100 Subject: [PATCH 11/19] Fix formatting --- .../manifest_declarative_source.py | 3 +-- .../parsers/model_to_component_factory.py | 22 +++++++++---------- .../resolvers/components_resolver.py | 8 ++++--- .../resolvers/http_components_resolver.py | 13 ++++++----- .../test_http_components_resolver.py | 10 +++++---- 5 files changed, 30 insertions(+), 26 deletions(-) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index c9612ec9d..00db66051 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -33,14 +33,13 @@ from airbyte_cdk.sources.declarative.parsers.manifest_component_transformer import ( ManifestComponentTransformer, ) -from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING - from airbyte_cdk.sources.declarative.parsers.manifest_reference_resolver import ( ManifestReferenceResolver, ) from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) +from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams.core import Stream from airbyte_cdk.sources.types import ConnectionDefinition diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 88b843055..512d65bb1 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -33,10 +33,6 @@ from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository -from airbyte_cdk.sources.declarative.resolvers import ( - HttpComponentsResolver, - ComponentMappingDefinition, -) from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus from airbyte_cdk.sources.declarative.auth import DeclarativeOauth2Authenticator, JwtAuthenticator from airbyte_cdk.sources.declarative.auth.declarative_authenticator import ( @@ -124,6 +120,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CheckStream as CheckStreamModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ComponentMappingDefinition as ComponentMappingDefinitionModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CompositeErrorHandler as CompositeErrorHandlerModel, ) @@ -196,6 +195,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GzipJsonDecoder as GzipJsonDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + HttpComponentsResolver as HttpComponentsResolverModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( HttpRequester as HttpRequesterModel, ) @@ -284,12 +286,6 @@ SimpleRetriever as SimpleRetrieverModel, ) from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - HttpComponentsResolver as HttpComponentsResolverModel, -) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - ComponentMappingDefinition as ComponentMappingDefinitionModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( SubstreamPartitionRouter as SubstreamPartitionRouterModel, ) @@ -304,9 +300,9 @@ XmlDecoder as XmlDecoderModel, ) from airbyte_cdk.sources.declarative.partition_routers import ( - PartitionRouter, CartesianProductStreamSlicer, ListPartitionRouter, + PartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter, ) @@ -347,6 +343,10 @@ ) from airbyte_cdk.sources.declarative.requesters.request_path import RequestPath from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod +from airbyte_cdk.sources.declarative.resolvers import ( + ComponentMappingDefinition, + HttpComponentsResolver, +) from airbyte_cdk.sources.declarative.retrievers import ( AsyncRetriever, SimpleRetriever, diff --git a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py index af1d76345..54ca83f54 100644 --- a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py @@ -4,11 +4,13 @@ from abc import ABC, abstractmethod from dataclasses import InitVar, dataclass -from typing import Any, Dict, Mapping, Optional, Type, Union, Iterable, List -from airbyte_cdk.sources.source import ExperimentalClassWarning -from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from typing import Any, Dict, Iterable, List, Mapping, Optional, Type, Union + from deprecated.classic import deprecated +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.source import ExperimentalClassWarning + @dataclass(frozen=True) class ComponentMappingDefinition: diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index c3686422d..d120da75b 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -4,19 +4,20 @@ from copy import deepcopy from dataclasses import InitVar, dataclass, field -from typing import Any, Dict, List, Mapping, Iterable +from typing import Any, Dict, Iterable, List, Mapping import dpath +from deprecated.classic import deprecated + from airbyte_cdk.sources.declarative.interpolation import InterpolatedString -from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever -from airbyte_cdk.sources.source import ExperimentalClassWarning -from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( - ComponentsResolver, ComponentMappingDefinition, + ComponentsResolver, ResolvedComponentMappingDefinition, ) -from deprecated.classic import deprecated +from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever +from airbyte_cdk.sources.source import ExperimentalClassWarning +from airbyte_cdk.sources.types import Config @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index a6f824e2e..727694152 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -2,8 +2,13 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. # -import pytest from unittest.mock import MagicMock + +import pytest + +from airbyte_cdk.models import Type +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.resolvers import ( ComponentMappingDefinition, HttpComponentsResolver, @@ -12,9 +17,6 @@ to_configured_catalog, to_configured_stream, ) -from airbyte_cdk.models import Type -from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource -from airbyte_cdk.sources.declarative.interpolation import InterpolatedString _MANIFEST = { "version": "5.0.0", From c478df513189000276590c0503f3c347a2cac735 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 29 Nov 2024 19:06:35 +0100 Subject: [PATCH 12/19] Update component schema --- .../sources/declarative/declarative_component_schema.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 1117625df..92962d2be 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1329,7 +1329,7 @@ definitions: type: array items: - type: string - interpolation_content: + interpolation_context: - config examples: - ["data"] @@ -2726,7 +2726,7 @@ definitions: type: array items: - type: string - interpolation_content: + interpolation_context: - config - components_values - stream_template_config From 97a932a9a0024fe813ecf53e860e06ae6c9de431 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 29 Nov 2024 19:26:42 +0100 Subject: [PATCH 13/19] Add caching to components resolver --- airbyte_cdk/sources/declarative/manifest_declarative_source.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index 00db66051..652da85c4 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -334,6 +334,9 @@ def _dynamic_stream_configs( f"Expected one of {list(COMPONENTS_RESOLVER_TYPE_MAPPING.keys())}." ) + if "retriever" in components_resolver_config: + components_resolver_config["retriever"]["requester"]["use_cache"] = True + # Create a resolver for dynamic components based on type components_resolver = self._constructor.create_component( COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], components_resolver_config, config From ce9539c9f01fa75923a232119bf764713bda4026 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 29 Nov 2024 19:30:54 +0100 Subject: [PATCH 14/19] Fix description for fields --- .../sources/declarative/declarative_component_schema.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 92962d2be..b94517ec2 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2756,7 +2756,7 @@ definitions: additionalProperties: true HttpComponentsResolver: type: object - description: Component resolve and populates stream templates with components fetched via an HTTP retriever. (This component is experimental. Use at your own risk.) + description: (This component is experimental. Use at your own risk.) Component resolve and populates stream templates with components fetched via an HTTP retriever. properties: type: type: string @@ -2781,7 +2781,7 @@ definitions: - components_mapping DynamicDeclarativeStream: type: object - description: A component that described how will be created declarative streams based on stream template. (This component is experimental. Use at your own risk.) + description: (This component is experimental. Use at your own risk.) A component that described how will be created declarative streams based on stream template. properties: type: type: string From 0160353fec7bb6e233f6d5a4ebebc55b5334b6cd Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 29 Nov 2024 18:33:47 +0000 Subject: [PATCH 15/19] Auto-fix lint and format issues --- .../models/declarative_component_schema.py | 74 +++++++------------ 1 file changed, 28 insertions(+), 46 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index e3adfadcf..cec3c7362 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,9 +528,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -754,21 +752,19 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -786,9 +782,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1479,25 +1473,21 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[ - Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] - ] = Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", + schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( + Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", + ) ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1715,11 +1705,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -1788,11 +1774,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], From 284241d9c0ffd9cbb9df4cc71ee48d75d17eb914 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Mon, 2 Dec 2024 22:37:38 +0100 Subject: [PATCH 16/19] Update unit tests with Maxime comments --- .../concurrent_declarative_source.py | 4 +- .../models/declarative_component_schema.py | 74 ++++----- .../parsers/manifest_component_transformer.py | 6 + .../test_http_components_resolver.py | 63 +++++--- .../test_manifest_declarative_source.py | 147 +++++++++++++++++- 5 files changed, 217 insertions(+), 77 deletions(-) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index ef01a82bd..e156290a5 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -184,8 +184,8 @@ def _group_streams( # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, # and this is validated during the initialization of the source. - streams = self.resolved_manifest.get("streams", []) + self.resolved_manifest.get( - "dynamic_streams", [] + streams = self._stream_configs(self._source_config) + self._dynamic_stream_configs( + self._source_config, config ) name_to_stream_mapping = {stream["name"]: stream for stream in streams} diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index e3adfadcf..cec3c7362 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,9 +528,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -754,21 +752,19 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -786,9 +782,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1479,25 +1473,21 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[ - Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] - ] = Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", + schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( + Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", + ) ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1715,11 +1705,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -1788,11 +1774,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 8cacda3d7..ed05b8e52 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -31,6 +31,12 @@ # DeclarativeStream "DeclarativeStream.retriever": "SimpleRetriever", "DeclarativeStream.schema_loader": "JsonFileSchemaLoader", + # DynamicDeclarativeStream + "DynamicDeclarativeStream.stream_template": "DeclarativeStream", + "DynamicDeclarativeStream.components_resolver": "HttpComponentsResolver", + # HttpComponentsResolver + "HttpComponentsResolver.retriever": "SimpleRetriever", + "HttpComponentsResolver.components_mapping": "ComponentMappingDefinition", # DefaultErrorHandler "DefaultErrorHandler.response_filters": "HttpResponseFilter", # DefaultPaginator diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 727694152..08a8e8c63 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -2,13 +2,16 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. # +import json from unittest.mock import MagicMock import pytest from airbyte_cdk.models import Type +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString -from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.resolvers import ( ComponentMappingDefinition, HttpComponentsResolver, @@ -17,6 +20,9 @@ to_configured_catalog, to_configured_stream, ) +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse + +_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"} _MANIFEST = { "version": "5.0.0", @@ -141,28 +147,41 @@ def test_http_components_resolver( assert result == expected_result -def test_dynamic_streams_read(requests_mock): +def test_dynamic_streams_read(): expected_stream_names = ["item_1", "item_2"] - requests_mock.get( - "https://api.test.com/items", - json=[{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}], - ) - requests_mock.get("https://api.test.com/items/1", json={"id": "1", "name": "item_1"}) - requests_mock.get("https://api.test.com/items/2", json={"id": "2", "name": "item_2"}) - - source = ManifestDeclarativeSource(source_config=_MANIFEST) - actual_catalog = source.discover(logger=source.logger, config={}) - - configured_streams = [ - to_configured_stream(stream, primary_key=stream.source_defined_primary_key) - for stream in actual_catalog.streams - ] - configured_catalog = to_configured_catalog(configured_streams) - records = [ - message.record - for message in source.read(MagicMock(), {}, configured_catalog) - if message.type == Type.RECORD - ] + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps([{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}]) + ), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/items/1"), + HttpResponse(body=json.dumps({"id": "1", "name": "item_1"})), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/items/2"), + HttpResponse(body=json.dumps({"id": "2", "name": "item_2"})), + ) + + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + + actual_catalog = source.discover(logger=source.logger, config={}) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + + records = [ + message.record + for message in source.read(MagicMock(), {}, configured_catalog) + if message.type == Type.RECORD + ] assert len(actual_catalog.streams) == 2 assert [stream.name for stream in actual_catalog.streams] == expected_stream_names diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index 1f4b6df56..ea92bac5e 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -71,6 +71,116 @@ def use_external_yaml_spec(self): yield os.remove(yaml_path) + @pytest.fixture + def _base_manifest(self): + """Base manifest without streams or dynamic streams.""" + return { + "version": "3.8.2", + "description": "This is a sample source connector that is very valid.", + "check": {"type": "CheckStream", "stream_names": ["lists"]}, + } + + @pytest.fixture + def _declarative_stream(self): + def declarative_stream_config( + name="lists", requester_type="HttpRequester", custom_requester=None + ): + """Generates a DeclarativeStream configuration.""" + requester_config = { + "type": requester_type, + "path": "/v3/marketing/lists", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config.apikey }}", + }, + "request_parameters": {"page_size": "{{ 10 }}"}, + } + if custom_requester: + requester_config.update(custom_requester) + + return { + "type": "DeclarativeStream", + "$parameters": { + "name": name, + "primary_key": "id", + "url_base": "https://api.sendgrid.com", + }, + "schema_loader": { + "name": "{{ parameters.stream_name }}", + "file_path": f"./source_sendgrid/schemas/{{{{ parameters.name }}}}.yaml", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": { + "type": "RequestOption", + "inject_into": "request_parameter", + "field_name": "page_size", + }, + "page_token_option": {"type": "RequestPath"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 10, + }, + }, + "requester": requester_config, + "record_selector": {"extractor": {"field_path": ["result"]}}, + }, + } + + return declarative_stream_config + + @pytest.fixture + def _dynamic_declarative_stream(self, _declarative_stream): + """Generates a DynamicDeclarativeStream configuration.""" + return { + "type": "DynamicDeclarativeStream", + "stream_template": _declarative_stream(), + "components_resolver": { + "type": "HttpComponentsResolver", + "$parameters": { + "name": "lists", + "primary_key": "id", + "url_base": "https://api.sendgrid.com", + }, + "retriever": { + "paginator": { + "type": "DefaultPaginator", + "page_size": 10, + "page_size_option": { + "type": "RequestOption", + "inject_into": "request_parameter", + "field_name": "page_size", + }, + "page_token_option": {"type": "RequestPath"}, + "pagination_strategy": { + "type": "CursorPagination", + "cursor_value": "{{ response._metadata.next }}", + "page_size": 10, + }, + }, + "requester": { + "path": "/v3/marketing/lists", + "authenticator": { + "type": "BearerAuthenticator", + "api_token": "{{ config.apikey }}", + }, + "request_parameters": {"page_size": "{{ 10 }}"}, + }, + "record_selector": {"extractor": {"field_path": ["result"]}}, + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{ components_value['name'] }}", + } + ], + }, + } + def test_valid_manifest(self): manifest = { "version": "3.8.2", @@ -516,14 +626,37 @@ def test_source_missing_checker_fails_validation(self): with pytest.raises(ValidationError): ManifestDeclarativeSource(source_config=manifest) - def test_source_with_missing_streams_fails(self): - manifest = { - "version": "0.29.3", - "definitions": None, - "check": {"type": "CheckStream", "stream_names": ["lists"]}, - } + def test_source_with_missing_streams_and_dynamic_streams_fails( + self, _base_manifest, _dynamic_declarative_stream, _declarative_stream + ): + # test case for manifest without streams or dynamic streams + manifest_without_streams_and_dynamic_streams = _base_manifest with pytest.raises(ValidationError): - ManifestDeclarativeSource(source_config=manifest) + ManifestDeclarativeSource(source_config=manifest_without_streams_and_dynamic_streams) + + # test case for manifest with streams + manifest_with_streams = { + **manifest_without_streams_and_dynamic_streams, + "streams": [ + _declarative_stream(name="lists"), + _declarative_stream( + name="stream_with_custom_requester", + requester_type="CustomRequester", + custom_requester={ + "class_name": "unit_tests.sources.declarative.external_component.SampleCustomComponent", + "custom_request_parameters": {"page_size": 10}, + }, + ), + ], + } + ManifestDeclarativeSource(source_config=manifest_with_streams) + + # test case for manifest with dynamic streams + manifest_with_dynamic_streams = { + **manifest_without_streams_and_dynamic_streams, + "dynamic_streams": [_dynamic_declarative_stream], + } + ManifestDeclarativeSource(source_config=manifest_with_dynamic_streams) def test_source_with_missing_version_fails(self): manifest = { From c6dcbc887b4bff42be161ccceb8f4f969b7ffa8c Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 3 Dec 2024 04:54:18 +0100 Subject: [PATCH 17/19] Updated version for manifest in unit tests --- .../declarative/resolvers/test_http_components_resolver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py index 08a8e8c63..9e9fe225a 100644 --- a/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py +++ b/unit_tests/sources/declarative/resolvers/test_http_components_resolver.py @@ -25,7 +25,7 @@ _CONFIG = {"start_date": "2024-07-01T00:00:00.000Z"} _MANIFEST = { - "version": "5.0.0", + "version": "6.7.0", "type": "DeclarativeSource", "check": {"type": "CheckStream", "stream_names": ["Rates"]}, "dynamic_streams": [ From 890eec1da246266ec2861cbc1bbedfd0fb3dd04d Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 3 Dec 2024 05:05:06 +0100 Subject: [PATCH 18/19] Added details to ComponentMappingDefinition doc string --- .../sources/declarative/resolvers/components_resolver.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py index 54ca83f54..d4eb01788 100644 --- a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py @@ -14,7 +14,9 @@ @dataclass(frozen=True) class ComponentMappingDefinition: - """Defines the key-value mapping configuration for a stream component.""" + """Defines the configuration for mapping a component in a stream. This class specifies + what field in the stream template should be updated with value, supporting dynamic interpolation + and type enforcement.""" field_path: List["InterpolatedString"] value: Union["InterpolatedString", str] @@ -24,7 +26,9 @@ class ComponentMappingDefinition: @dataclass(frozen=True) class ResolvedComponentMappingDefinition: - """Represents a parsed and resolved component mapping for a stream configuration.""" + """Defines resolved configuration for mapping a component in a stream. This class specifies + what field in the stream template should be updated with value, supporting dynamic interpolation + and type enforcement.""" field_path: List["InterpolatedString"] value: "InterpolatedString" From de7b6bc8da661f32a084fe5d983086c6be0a37f5 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 5 Dec 2024 15:51:48 +0100 Subject: [PATCH 19/19] Replace deprecated import --- .../sources/declarative/resolvers/components_resolver.py | 2 +- .../sources/declarative/resolvers/http_components_resolver.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py index d4eb01788..5975b3082 100644 --- a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py @@ -6,7 +6,7 @@ from dataclasses import InitVar, dataclass from typing import Any, Dict, Iterable, List, Mapping, Optional, Type, Union -from deprecated.classic import deprecated +from typing_extensions import deprecated from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.source import ExperimentalClassWarning diff --git a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py index d120da75b..322b43683 100644 --- a/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py @@ -7,7 +7,7 @@ from typing import Any, Dict, Iterable, List, Mapping import dpath -from deprecated.classic import deprecated +from typing_extensions import deprecated from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.resolvers.components_resolver import (