Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Pinterest: Fix backoff waiting time #32672

Merged
merged 3 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 5cb7e5fe-38c2-11ec-8d3d-0242ac130003
dockerImageTag: 0.8.1
dockerImageTag: 0.8.2
dockerRepository: airbyte/source-pinterest
connectorBuildOptions:
baseImage: docker.io/airbyte/python-connector-base:1.1.0@sha256:bd98f6505c6764b1b5f99d3aedc23dfc9e9af631a62533f60eb32b1d3dbab20c
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class PinterestAnalyticsReportStream(PinterestAnalyticsStream):
Details - https://developers.pinterest.com/docs/api/v5/#operation/analytics/create_report"""

http_method = "POST"
report_wait_timeout = 180
report_wait_timeout = 60 * 10
report_generation_maximum_retries = 5

@property
Expand Down Expand Up @@ -69,17 +69,17 @@ def request_params(

def backoff_max_time(func):
def wrapped(self, *args, **kwargs):
return backoff.on_exception(backoff.constant, RetryableException, max_time=self.report_wait_timeout * 60, interval=10)(func)(
return backoff.on_exception(backoff.constant, RetryableException, max_time=self.report_wait_timeout, interval=10)(func)(
self, *args, **kwargs
)

return wrapped

def backoff_max_tries(func):
def wrapped(self, *args, **kwargs):
return backoff.on_exception(backoff.expo, ReportGenerationFailure, max_tries=self.report_generation_maximum_retries)(func)(
self, *args, **kwargs
)
return backoff.on_exception(
backoff.expo, ReportGenerationFailure, max_tries=self.report_generation_maximum_retries, max_time=self.report_wait_timeout
)(func)(self, *args, **kwargs)

return wrapped

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class NonJSONResponse(Exception):
pass


class RateLimitExceeded(Exception):
pass


class PinterestStream(HttpStream, ABC):
url_base = "https://api.pinterest.com/v5/"
primary_key = "id"
Expand Down Expand Up @@ -90,7 +94,12 @@ def should_retry(self, response: requests.Response) -> bool:
def backoff_time(self, response: requests.Response) -> Optional[float]:
if response.status_code == requests.codes.too_many_requests:
self.logger.error(f"For stream {self.name} rate limit exceeded.")
return float(response.headers.get("X-RateLimit-Reset", 0))
sleep_time = float(response.headers.get("X-RateLimit-Reset", 0))
if sleep_time > 600:
raise RateLimitExceeded(
f"Rate limit exceeded for stream {self.name}. Waiting time is longer than 10 minutes: {sleep_time}s."
)
return sleep_time


class PinterestSubStream(HttpSubStream):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,14 @@ def test_check_wrong_date_connection(wrong_date_config):

@responses.activate
def test_check_connection_expired_token(test_config):
responses.add(
responses.POST,
"https://api.pinterest.com/v5/oauth/token",
status=401
)
responses.add(responses.POST, "https://api.pinterest.com/v5/oauth/token", status=401)
source = SourcePinterest()
logger_mock = MagicMock()
assert source.check_connection(logger_mock, test_config) == (False,
'Try to re-authenticate because current refresh token is not valid. ' \
'401 Client Error: Unauthorized for url: https://api.pinterest.com/v5/oauth/token')
assert source.check_connection(logger_mock, test_config) == (
False,
"Try to re-authenticate because current refresh token is not valid. "
"401 Client Error: Unauthorized for url: https://api.pinterest.com/v5/oauth/token",
)


def test_get_authenticator(test_config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
Keywords,
PinterestStream,
PinterestSubStream,
RateLimitExceeded,
UserAccountAnalytics,
)

Expand Down Expand Up @@ -148,6 +149,12 @@ def test_non_json_response(requests_mock):
"test_response, test_headers, status_code, expected",
[
({"code": 7, "message": "Some other error message"}, {"X-RateLimit-Reset": "2"}, 429, 2.0),
(
{"code": 7, "message": "Some other error message"},
{"X-RateLimit-Reset": "2000"},
429,
(RateLimitExceeded, "Rate limit exceeded for stream boards. Waiting time is longer than 10 minutes: 2000.0s."),
),
],
)
def test_backoff_on_rate_limit_error(requests_mock, test_response, status_code, test_headers, expected):
Expand All @@ -161,8 +168,13 @@ def test_backoff_on_rate_limit_error(requests_mock, test_response, status_code,
)

response = requests.get(url)
result = stream.backoff_time(response)
assert result == expected

if isinstance(expected, tuple):
with pytest.raises(expected[0], match=expected[1]):
stream.backoff_time(response)
else:
result = stream.backoff_time(response)
assert result == expected


@pytest.mark.parametrize(
Expand Down
Loading