diff --git a/airbyte-integrations/connectors/source-recharge/metadata.yaml b/airbyte-integrations/connectors/source-recharge/metadata.yaml index fcb7b74a2a5b0..b92d8f420bd4d 100644 --- a/airbyte-integrations/connectors/source-recharge/metadata.yaml +++ b/airbyte-integrations/connectors/source-recharge/metadata.yaml @@ -7,7 +7,7 @@ data: connectorBuildOptions: baseImage: docker.io/airbyte/python-connector-base:2.0.0@sha256:c44839ba84406116e8ba68722a0f30e8f6e7056c726f447681bb9e9ece8bd916 definitionId: 45d2e135-2ede-49e1-939f-3e3ec357a65e - dockerImageTag: 2.2.0 + dockerImageTag: 2.3.0 dockerRepository: airbyte/source-recharge githubIssueLabel: source-recharge icon: recharge.svg diff --git a/airbyte-integrations/connectors/source-recharge/poetry.lock b/airbyte-integrations/connectors/source-recharge/poetry.lock index f3dee58b22fec..ae032d221cbe4 100644 --- a/airbyte-integrations/connectors/source-recharge/poetry.lock +++ b/airbyte-integrations/connectors/source-recharge/poetry.lock @@ -2,13 +2,13 @@ [[package]] name = "airbyte-cdk" -version = "2.4.0" +version = "3.7.0" description = "A framework for writing Airbyte Connectors." optional = false python-versions = "<4.0,>=3.9" files = [ - {file = "airbyte_cdk-2.4.0-py3-none-any.whl", hash = "sha256:39470b2fe97f28959fcecb839d3080a8aba4a64a29dddf54a39f11f93f9f9ef7"}, - {file = "airbyte_cdk-2.4.0.tar.gz", hash = "sha256:f973d2e17a6dd0416c4395139e2761a10b38aafa61e097eaacffebbe6164ef45"}, + {file = "airbyte_cdk-3.7.0-py3-none-any.whl", hash = "sha256:207b800b18f76ad264bb18d198b9e267694a0b4b638af19cf01f220f0f05abc7"}, + {file = "airbyte_cdk-3.7.0.tar.gz", hash = "sha256:29dd5e530564c2ba3f31d6d2d1b96b9690d94910c99d03a5aad09897aea9e973"}, ] [package.dependencies] @@ -1420,4 +1420,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9,<3.12" -content-hash = "b5da122b53d20f7fe76a2f84c3be8bed2401714bbcba5de48ec6a4fcd3ef1851" +content-hash = "4065abb5373712684c1795e3148ca4b6b3ee0e40de36953b285309fdb836ce13" diff --git a/airbyte-integrations/connectors/source-recharge/pyproject.toml b/airbyte-integrations/connectors/source-recharge/pyproject.toml index 4ca3caea39b3f..c2299417273c2 100644 --- a/airbyte-integrations/connectors/source-recharge/pyproject.toml +++ b/airbyte-integrations/connectors/source-recharge/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "2.2.0" +version = "2.3.0" name = "source-recharge" description = "Source implementation for Recharge." authors = [ "Airbyte ",] @@ -17,7 +17,7 @@ include = "source_recharge" [tool.poetry.dependencies] python = "^3.9,<3.12" -airbyte-cdk = "^2" +airbyte-cdk = "^3" freezegun = "^1.4.0" [tool.poetry.scripts] diff --git a/airbyte-integrations/connectors/source-recharge/source_recharge/components/recharge_error_handler.py b/airbyte-integrations/connectors/source-recharge/source_recharge/components/recharge_error_handler.py new file mode 100644 index 0000000000000..80d19ac2eca28 --- /dev/null +++ b/airbyte-integrations/connectors/source-recharge/source_recharge/components/recharge_error_handler.py @@ -0,0 +1,30 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import logging +from typing import Optional, Union + +from airbyte_cdk.sources.streams.http.error_handlers import HttpStatusErrorHandler +from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, FailureType, ResponseAction +from requests import Response + + +class RechargeErrorHandler(HttpStatusErrorHandler): + def __init__(self, logger: logging.Logger) -> None: + self.logger = logger + super().__init__(logger=logger) + + def interpret_response(self, response_or_exception: Optional[Union[Response, Exception]] = None) -> ErrorResolution: + + if isinstance(response_or_exception, Response): + content_length = int(response_or_exception.headers.get("Content-Length", 0)) + incomplete_data_response = response_or_exception.status_code == 200 and content_length > len(response_or_exception.content) + if incomplete_data_response: + return ErrorResolution( + response_action=ResponseAction.RETRY, + failure_type=FailureType.transient_error, + error_message="The response is incomplete, retrying the request.", + ) + + return super().interpret_response(response_or_exception) diff --git a/airbyte-integrations/connectors/source-recharge/source_recharge/streams.py b/airbyte-integrations/connectors/source-recharge/source_recharge/streams.py index dfdae52526eb5..39b44c35cf3d8 100644 --- a/airbyte-integrations/connectors/source-recharge/source_recharge/streams.py +++ b/airbyte-integrations/connectors/source-recharge/source_recharge/streams.py @@ -9,8 +9,10 @@ import pendulum import requests from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from source_recharge.components.recharge_error_handler import RechargeErrorHandler class ApiVersion(Enum): @@ -113,12 +115,8 @@ def get_stream_data(self, response_data: Any) -> List[dict]: else: return [response_data] - def should_retry(self, response: requests.Response) -> bool: - content_length = int(response.headers.get("Content-Length", 0)) - incomplete_data_response = response.status_code == 200 and content_length > len(response.content) - if incomplete_data_response: - return True - return super().should_retry(response) + def get_error_handler(self) -> ErrorHandler: + return RechargeErrorHandler(logger=self.logger) def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: start_date_value = (stream_state or {}).get(self.cursor_field, self._start_date) if self.cursor_field else self._start_date diff --git a/airbyte-integrations/connectors/source-recharge/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-recharge/unit_tests/test_streams.py index b10abebed9fda..ce44c9bfcdfa6 100644 --- a/airbyte-integrations/connectors/source-recharge/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-recharge/unit_tests/test_streams.py @@ -8,6 +8,7 @@ import pytest import requests +from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction from source_recharge.source import Orders, RechargeTokenAuthenticator, SourceRecharge @@ -69,23 +70,23 @@ def test_path(self, config, stream_cls, stream_type, expected) -> None: assert expected == result @pytest.mark.parametrize( - ("http_status", "headers", "should_retry"), + ("http_status", "headers", "expected_action"), [ - (HTTPStatus.OK, {"Content-Length": 256}, True), - (HTTPStatus.BAD_REQUEST, {}, False), - (HTTPStatus.TOO_MANY_REQUESTS, {}, True), - (HTTPStatus.INTERNAL_SERVER_ERROR, {}, True), - (HTTPStatus.FORBIDDEN, {}, False), + (HTTPStatus.OK, {"Content-Length": 256}, ResponseAction.RETRY), + (HTTPStatus.BAD_REQUEST, {}, ResponseAction.FAIL), + (HTTPStatus.TOO_MANY_REQUESTS, {}, ResponseAction.RATE_LIMITED), + (HTTPStatus.INTERNAL_SERVER_ERROR, {}, ResponseAction.RETRY), + (HTTPStatus.FORBIDDEN, {}, ResponseAction.FAIL), ], ) - def test_should_retry(self, config, http_status, headers, should_retry) -> None: + def test_should_retry(self, config, http_status, headers, expected_action) -> None: response = requests.Response() response.status_code = http_status response._content = b"" response.headers = headers stream = Orders(config, authenticator=None) - assert stream.should_retry(response) == should_retry - + error_resolution = stream.get_error_handler().interpret_response(response) + error_resolution.response_action == expected_action class TestFullRefreshStreams: def generate_records(self, stream_name, count) -> Union[Mapping[str, List[Mapping[str, Any]]], Mapping[str, Any]]: diff --git a/docs/integrations/sources/recharge.md b/docs/integrations/sources/recharge.md index 56d26e5658822..1eb08f77c955b 100644 --- a/docs/integrations/sources/recharge.md +++ b/docs/integrations/sources/recharge.md @@ -79,6 +79,7 @@ The Recharge connector should gracefully handle Recharge API limitations under n | Version | Date | Pull Request | Subject | |:--------|:-----------| :------------------------------------------------------- |:-------------------------------------------------------------------------------------------------------------------------------| +| 2.3.0 | 2024-07-17 | [42076](https://github.com/airbytehq/airbyte/pull/42076) | Migrate to CDK v3.7.0 | | 2.2.0 | 2024-07-17 | [42075](https://github.com/airbytehq/airbyte/pull/42075) | Migrate to CDK v2.4.0 | | 2.1.0 | 2024-07-17 | [42069](https://github.com/airbytehq/airbyte/pull/42069) | Migrate to CDK v1.8.0 | | 2.0.6 | 2024-07-13 | [41748](https://github.com/airbytehq/airbyte/pull/41748) | Update dependencies |