Skip to content

Commit

Permalink
chore: simplify RequestOption construction in component factory (#303)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Jan 31, 2025
1 parent fd919b1 commit 24264a0
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,8 @@ def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[
}
return names_to_types[value_type]

@staticmethod
def create_api_key_authenticator(
self,
model: ApiKeyAuthenticatorModel,
config: Config,
token_provider: Optional[TokenProvider] = None,
Expand All @@ -732,10 +732,8 @@ def create_api_key_authenticator(
)

request_option = (
RequestOption(
inject_into=RequestOptionType(model.inject_into.inject_into.value),
field_name=model.inject_into.field_name,
parameters=model.parameters or {},
self._create_component_from_model(
model.inject_into, config, parameters=model.parameters or {}
)
if model.inject_into
else RequestOption(
Expand All @@ -744,6 +742,7 @@ def create_api_key_authenticator(
parameters=model.parameters or {},
)
)

return ApiKeyAuthenticator(
token_provider=(
token_provider
Expand Down Expand Up @@ -825,7 +824,7 @@ def create_session_token_authenticator(
token_provider=token_provider,
)
else:
return ModelToComponentFactory.create_api_key_authenticator(
return self.create_api_key_authenticator(
ApiKeyAuthenticatorModel(
type="ApiKeyAuthenticator",
api_token="",
Expand Down Expand Up @@ -1272,19 +1271,15 @@ def create_datetime_based_cursor(
)

end_time_option = (
RequestOption(
inject_into=RequestOptionType(model.end_time_option.inject_into.value),
field_name=model.end_time_option.field_name,
parameters=model.parameters or {},
self._create_component_from_model(
model.end_time_option, config, parameters=model.parameters or {}
)
if model.end_time_option
else None
)
start_time_option = (
RequestOption(
inject_into=RequestOptionType(model.start_time_option.inject_into.value),
field_name=model.start_time_option.field_name,
parameters=model.parameters or {},
self._create_component_from_model(
model.start_time_option, config, parameters=model.parameters or {}
)
if model.start_time_option
else None
Expand Down Expand Up @@ -1358,19 +1353,15 @@ def create_declarative_stream(
cursor_model = model.incremental_sync

end_time_option = (
RequestOption(
inject_into=RequestOptionType(cursor_model.end_time_option.inject_into.value),
field_name=cursor_model.end_time_option.field_name,
parameters=cursor_model.parameters or {},
self._create_component_from_model(
cursor_model.end_time_option, config, parameters=cursor_model.parameters or {}
)
if cursor_model.end_time_option
else None
)
start_time_option = (
RequestOption(
inject_into=RequestOptionType(cursor_model.start_time_option.inject_into.value),
field_name=cursor_model.start_time_option.field_name,
parameters=cursor_model.parameters or {},
self._create_component_from_model(
cursor_model.start_time_option, config, parameters=cursor_model.parameters or {}
)
if cursor_model.start_time_option
else None
Expand Down Expand Up @@ -1879,16 +1870,11 @@ def create_jwt_authenticator(
additional_jwt_payload=model.additional_jwt_payload,
)

@staticmethod
def create_list_partition_router(
model: ListPartitionRouterModel, config: Config, **kwargs: Any
self, model: ListPartitionRouterModel, config: Config, **kwargs: Any
) -> ListPartitionRouter:
request_option = (
RequestOption(
inject_into=RequestOptionType(model.request_option.inject_into.value),
field_name=model.request_option.field_name,
parameters=model.parameters or {},
)
self._create_component_from_model(model.request_option, config)
if model.request_option
else None
)
Expand Down Expand Up @@ -2072,10 +2058,24 @@ def create_request_option(
model: RequestOptionModel, config: Config, **kwargs: Any
) -> RequestOption:
inject_into = RequestOptionType(model.inject_into.value)
field_path: Optional[List[Union[InterpolatedString, str]]] = model.field_path # type: ignore
field_name = model.field_name if model.field_name else None
field_path: Optional[List[Union[InterpolatedString, str]]] = (
[
InterpolatedString.create(segment, parameters=kwargs.get("parameters", {}))
for segment in model.field_path
]
if model.field_path
else None
)
field_name = (
InterpolatedString.create(model.field_name, parameters=kwargs.get("parameters", {}))
if model.field_name
else None
)
return RequestOption(
field_name=field_name, field_path=field_path, inject_into=inject_into, parameters={}
field_name=field_name,
field_path=field_path,
inject_into=inject_into,
parameters=kwargs.get("parameters", {}),
)

def create_record_selector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,8 +593,8 @@ def test_list_based_stream_slicer_with_values_defined_in_config():
cursor_field: repository
request_option:
type: RequestOption
inject_into: header
field_name: repository
inject_into: body_json
field_path: ["repository", "id"]
"""
parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
Expand All @@ -610,8 +610,10 @@ def test_list_based_stream_slicer_with_values_defined_in_config():

assert isinstance(partition_router, ListPartitionRouter)
assert partition_router.values == ["airbyte", "airbyte-cloud"]
assert partition_router.request_option.inject_into == RequestOptionType.header
assert partition_router.request_option.field_name.eval(config=input_config) == "repository"
assert partition_router.request_option.inject_into == RequestOptionType.body_json
for field in partition_router.request_option.field_path:
assert isinstance(field, InterpolatedString)
assert len(partition_router.request_option.field_path) == 2


def test_create_substream_partition_router():
Expand Down Expand Up @@ -714,7 +716,7 @@ def test_datetime_based_cursor():
end_time_option:
type: RequestOption
inject_into: body_json
field_name: "before_{{ parameters['cursor_field'] }}"
field_path: ["before_{{ parameters['cursor_field'] }}"]
partition_field_start: star
partition_field_end: en
"""
Expand Down Expand Up @@ -743,7 +745,9 @@ def test_datetime_based_cursor():
== "since_updated_at"
)
assert stream_slicer.end_time_option.inject_into == RequestOptionType.body_json
assert stream_slicer.end_time_option.field_name.eval({}) == "before_created_at"
assert [field.eval({}) for field in stream_slicer.end_time_option.field_path] == [
"before_created_at"
]
assert stream_slicer._partition_field_start.eval({}) == "star"
assert stream_slicer._partition_field_end.eval({}) == "en"

Expand Down Expand Up @@ -904,8 +908,8 @@ def test_resumable_full_refresh_stream():
type: DefaultPaginator
page_size_option:
type: RequestOption
inject_into: request_parameter
field_name: page_size
inject_into: body_json
field_path: ["variables", "page_size"]
page_token_option:
type: RequestPath
pagination_strategy:
Expand Down Expand Up @@ -1003,11 +1007,10 @@ def test_resumable_full_refresh_stream():

assert isinstance(stream.retriever.paginator, DefaultPaginator)
assert isinstance(stream.retriever.paginator.decoder, PaginationDecoderDecorator)
assert stream.retriever.paginator.page_size_option.field_name.eval(input_config) == "page_size"
assert (
stream.retriever.paginator.page_size_option.inject_into
== RequestOptionType.request_parameter
)
for string in stream.retriever.paginator.page_size_option.field_path:
assert isinstance(string, InterpolatedString)
assert len(stream.retriever.paginator.page_size_option.field_path) == 2
assert stream.retriever.paginator.page_size_option.inject_into == RequestOptionType.body_json
assert isinstance(stream.retriever.paginator.page_token_option, RequestPath)
assert stream.retriever.paginator.url_base.string == "https://api.sendgrid.com/v3/"
assert stream.retriever.paginator.url_base.default == "https://api.sendgrid.com/v3/"
Expand Down Expand Up @@ -2509,7 +2512,6 @@ def test_merge_incremental_and_partition_router(incremental, partition_router, e

assert isinstance(stream, DeclarativeStream)
assert isinstance(stream.retriever, SimpleRetriever)
print(stream.retriever.stream_slicer)
assert isinstance(stream.retriever.stream_slicer, expected_type)

if incremental and partition_router:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,8 @@ def test_manifest_without_at_least_one_stream(self):
"page_size": 10,
"page_size_option": {
"type": "RequestOption",
"inject_into": "request_parameter",
"field_name": "page_size",
"inject_into": "request_body",
"field_path": ["variables", "page_size"],
},
"page_token_option": {"type": "RequestPath"},
"pagination_strategy": {
Expand Down

0 comments on commit 24264a0

Please sign in to comment.