Skip to content

Commit

Permalink
Source google-analytics-data-api: upgrade cdk 3 (#42841)
Browse files Browse the repository at this point in the history
  • Loading branch information
aldogonzalez8 authored Aug 7, 2024
1 parent 223717a commit fd7fa00
Show file tree
Hide file tree
Showing 13 changed files with 435 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a
dockerImageTag: 2.4.14
dockerImageTag: 2.5.0
dockerRepository: airbyte/source-google-analytics-data-api
documentationUrl: https://docs.airbyte.com/integrations/sources/google-analytics-data-api
githubIssueLabel: source-google-analytics-data-api
Expand Down Expand Up @@ -51,10 +51,11 @@ data:
- language:python
- cdk:python
connectorTestSuitesOptions:
- suite: liveTests
testConnections:
- name: google-analytics-data-api_config_dev_null
id: 6cee88ef-3893-4498-9812-8b80dbfcab0e
# Running regression and acceptance at the same time can cause quota consumption problems
# - suite: liveTests
# testConnections:
# - name: google-analytics-data-api_config_dev_null
# id: 6cee88ef-3893-4498-9812-8b80dbfcab0e
- suite: unitTests
- suite: acceptanceTests
testSecrets:
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.4.14"
version = "2.5.0"
name = "source-google-analytics-data-api"
description = "Source implementation for Google Analytics Data Api."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -17,10 +17,10 @@ include = "source_google_analytics_data_api"

[tool.poetry.dependencies]
python = "^3.9,<3.12"
cryptography = "==37.0.4"
cryptography = "==42.0.5"
requests = "==2.31.0"
airbyte-cdk = "^0"
PyJWT = "==2.4.0"
airbyte-cdk = "^3"
PyJWT = "==2.8.0"
pandas = "==2.2.0"

[tool.poetry.scripts]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Any, Iterable, Mapping, Optional

import requests
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
from requests.exceptions import JSONDecodeError

from .utils import API_LIMIT_PER_HOUR
Expand All @@ -22,9 +23,8 @@ class GoogleAnalyticsApiQuotaBase:
# setting the scenario values for attrs prior to the 429 error
treshold: float = 0.1
# base attrs
should_retry: Optional[bool] = True
response_action: ResponseAction = ResponseAction.RETRY
backoff_time: Optional[int] = None
raise_on_http_errors: bool = True
# stop making new slices globally
stop_iter: bool = False
error_message = None
Expand All @@ -33,23 +33,20 @@ class GoogleAnalyticsApiQuotaBase:
"concurrentRequests": {
"error_pattern": "Exhausted concurrent requests quota.",
"backoff": 30,
"should_retry": True,
"raise_on_http_errors": False,
"response_action": ResponseAction.RETRY,
"stop_iter": False,
},
"tokensPerProjectPerHour": {
"error_pattern": "Exhausted property tokens for a project per hour.",
"backoff": 1800,
"should_retry": True,
"raise_on_http_errors": False,
"response_action": ResponseAction.RETRY,
"stop_iter": False,
"error_message": API_LIMIT_PER_HOUR,
},
"potentiallyThresholdedRequestsPerHour": {
"error_pattern": "Exhausted potentially thresholded requests quota.",
"backoff": 1800,
"should_retry": True,
"raise_on_http_errors": False,
"response_action": ResponseAction.RETRY,
"stop_iter": False,
"error_message": API_LIMIT_PER_HOUR,
},
Expand All @@ -61,22 +58,19 @@ class GoogleAnalyticsApiQuotaBase:
# 'tokensPerDay': {
# 'error_pattern': "___",
# "backoff": None,
# "should_retry": False,
# "raise_on_http_errors": False,
# "response_action": ResponseAction.FAIL,
# "stop_iter": True,
# },
# 'tokensPerHour': {
# 'error_pattern': "___",
# "backoff": 1800,
# "should_retry": True,
# "raise_on_http_errors": False,
# "response_action": ResponseAction.RETRY,
# "stop_iter": False,
# },
# 'serverErrorsPerProjectPerHour': {
# 'error_pattern': "___",
# "backoff": 3600,
# "should_retry": True,
# "raise_on_http_errors": False,
# "response_action": ResponseAction.RETRY,
# "stop_iter": False,
# },
}
Expand Down Expand Up @@ -105,16 +99,14 @@ def _get_known_quota_from_response(self, property_quota: Mapping[str, Any]) -> M
def _set_retry_attrs_for_quota(self, quota_name: str) -> None:
quota = self.quota_mapping.get(quota_name, {})
if quota:
self.should_retry = quota.get("should_retry")
self.raise_on_http_errors = quota.get("raise_on_http_errors")
self.response_action = quota.get("response_action")
self.stop_iter = quota.get("stop_iter")
self.backoff_time = quota.get("backoff")
self.error_message = quota.get("error_message")
self.error_message = quota.get("error_message", quota.get("error_pattern"))

def _set_default_retry_attrs(self) -> None:
self.should_retry = True
def _set_default_handle_error_attrs(self) -> None:
self.response_action = ResponseAction.RETRY
self.backoff_time = None
self.raise_on_http_errors = True
self.stop_iter = False

def _set_initial_quota(self, current_quota: Optional[Mapping[str, Any]] = None) -> None:
Expand All @@ -137,7 +129,7 @@ def _check_remaining_quota(self, current_quota: Mapping[str, Any]) -> None:
def _check_for_errors(self, response: requests.Response) -> None:
try:
# revert to default values after successul retry
self._set_default_retry_attrs()
self._set_default_handle_error_attrs()
error = response.json().get("error")
if error:
quota_name = self._get_quota_name_from_error_message(error.get("message"))
Expand All @@ -161,7 +153,7 @@ def _check_quota(self, response: requests.Response):
try:
parsed_response = response.json()
except (AttributeError, JSONDecodeError) as e:
self.logger.warn(
self.logger.warning(
f"`GoogleAnalyticsApiQuota._check_quota`: Received non JSON response from the API. Full error: {e}. Bypassing."
)
parsed_response = {}
Expand All @@ -170,7 +162,7 @@ def _check_quota(self, response: requests.Response):
if property_quota:
# return default attrs values once successfully retried
# or until another 429 error is hit
self._set_default_retry_attrs()
self._set_default_handle_error_attrs()
# reduce quota list to known kinds only
current_quota = self._get_known_quota_from_response(property_quota)
if current_quota:
Expand All @@ -183,7 +175,7 @@ def _check_quota(self, response: requests.Response):

def handle_quota(self) -> None:
"""
The function decorator is used to integrate with the `should_retry` method,
The function decorator is used to integrate with the `interpret_response` method,
or any other method that provides early access to the `response` object.
"""

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from typing import Mapping, Type, Union

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution
from source_google_analytics_data_api.utils import WRONG_CUSTOM_REPORT_CONFIG


def get_google_analytics_data_api_base_error_mapping(report_name) -> Mapping[Union[int, str, Type[Exception]], ErrorResolution]:
"""
Updating base default error messages friendly config error message that includes the steam report name
"""
stream_error_mapping = {}
for error_key, base_error_resolution in DEFAULT_ERROR_MAPPING.items():
if base_error_resolution.failure_type in (FailureType.config_error, FailureType.system_error):
stream_error_mapping[error_key] = ErrorResolution(
response_action=base_error_resolution.response_action,
failure_type=FailureType.config_error,
error_message=WRONG_CUSTOM_REPORT_CONFIG.format(report=report_name),
)
else:
stream_error_mapping[error_key] = base_error_resolution
return stream_error_mapping
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
from typing import Mapping, Type, Union

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ErrorResolution, ResponseAction

PROPERTY_ID_DOCS_URL = "https://developers.google.com/analytics/devguides/reporting/data/v1/property-id#what_is_my_property_id"
MESSAGE = "Incorrect Property ID: {property_id}. Access was denied to the property ID entered. Check your access to the Property ID or use Google Analytics {property_id_docs_url} to find your Property ID."


def get_google_analytics_data_api_metadata_error_mapping(property_id) -> Mapping[Union[int, str, Type[Exception]], ErrorResolution]:
"""
Adding friendly messages to bad request and forbidden responses that includes the property id and the documentation guidance.
"""
return DEFAULT_ERROR_MAPPING | {
403: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message=MESSAGE.format(property_id=property_id, property_id_docs_url=PROPERTY_ID_DOCS_URL),
),
400: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message=MESSAGE.format(property_id=property_id, property_id_docs_url=PROPERTY_ID_DOCS_URL),
),
}
Loading

0 comments on commit fd7fa00

Please sign in to comment.