Skip to content

Commit

Permalink
Merge branch 'main' into daryna/low-code/pass-refresh-headers-to-oauth
Browse files Browse the repository at this point in the history
  • Loading branch information
darynaishchenko committed Jan 16, 2025
2 parents 60ecfbf + 40a9f1e commit 1da0d93
Show file tree
Hide file tree
Showing 23 changed files with 914 additions and 104 deletions.
26 changes: 26 additions & 0 deletions airbyte_cdk/sources/declarative/auth/oauth.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ class DeclarativeOauth2Authenticator(AbstractOauth2Authenticator, DeclarativeAut
token_expiry_is_time_of_expiration: bool = False
access_token_name: Union[InterpolatedString, str] = "access_token"
access_token_value: Optional[Union[InterpolatedString, str]] = None
client_id_name: Union[InterpolatedString, str] = "client_id"
client_secret_name: Union[InterpolatedString, str] = "client_secret"
expires_in_name: Union[InterpolatedString, str] = "expires_in"
refresh_token_name: Union[InterpolatedString, str] = "refresh_token"
refresh_request_body: Optional[Mapping[str, Any]] = None
refresh_request_headers: Optional[Mapping[str, Any]] = None
grant_type_name: Union[InterpolatedString, str] = "grant_type"
grant_type: Union[InterpolatedString, str] = "refresh_token"
message_repository: MessageRepository = NoopMessageRepository()

Expand All @@ -71,8 +75,15 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)
else:
self._token_refresh_endpoint = None
self._client_id_name = InterpolatedString.create(self.client_id_name, parameters=parameters)
self._client_id = InterpolatedString.create(self.client_id, parameters=parameters)
self._client_secret_name = InterpolatedString.create(
self.client_secret_name, parameters=parameters
)
self._client_secret = InterpolatedString.create(self.client_secret, parameters=parameters)
self._refresh_token_name = InterpolatedString.create(
self.refresh_token_name, parameters=parameters
)
if self.refresh_token is not None:
self._refresh_token: Optional[InterpolatedString] = InterpolatedString.create(
self.refresh_token, parameters=parameters
Expand All @@ -85,6 +96,9 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.expires_in_name = InterpolatedString.create(
self.expires_in_name, parameters=parameters
)
self.grant_type_name = InterpolatedString.create(
self.grant_type_name, parameters=parameters
)
self.grant_type = InterpolatedString.create(self.grant_type, parameters=parameters)
self._refresh_request_body = InterpolatedMapping(
self.refresh_request_body or {}, parameters=parameters
Expand Down Expand Up @@ -127,18 +141,27 @@ def get_token_refresh_endpoint(self) -> Optional[str]:
return refresh_token_endpoint
return None

def get_client_id_name(self) -> str:
return self._client_id_name.eval(self.config) # type: ignore # eval returns a string in this context

def get_client_id(self) -> str:
client_id: str = self._client_id.eval(self.config)
if not client_id:
raise ValueError("OAuthAuthenticator was unable to evaluate client_id parameter")
return client_id

def get_client_secret_name(self) -> str:
return self._client_secret_name.eval(self.config) # type: ignore # eval returns a string in this context

def get_client_secret(self) -> str:
client_secret: str = self._client_secret.eval(self.config)
if not client_secret:
raise ValueError("OAuthAuthenticator was unable to evaluate client_secret parameter")
return client_secret

def get_refresh_token_name(self) -> str:
return self._refresh_token_name.eval(self.config) # type: ignore # eval returns a string in this context

def get_refresh_token(self) -> Optional[str]:
return None if self._refresh_token is None else str(self._refresh_token.eval(self.config))

Expand All @@ -151,6 +174,9 @@ def get_access_token_name(self) -> str:
def get_expires_in_name(self) -> str:
return self.expires_in_name.eval(self.config) # type: ignore # eval returns a string in this context

def get_grant_type_name(self) -> str:
return self.grant_type_name.eval(self.config) # type: ignore # eval returns a string in this context

def get_grant_type(self) -> str:
return self.grant_type.eval(self.config) # type: ignore # eval returns a string in this context

Expand Down
87 changes: 67 additions & 20 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ definitions:
properties:
type:
type: string
enum: [ CustomSchemaNormalization ]
enum: [CustomSchemaNormalization]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_<name>.<package>.<class_name>`.
Expand Down Expand Up @@ -1047,20 +1047,41 @@ definitions:
type:
type: string
enum: [OAuthAuthenticator]
client_id_name:
title: Client ID Property Name
description: The name of the property to use to refresh the `access_token`.
type: string
default: "client_id"
examples:
- custom_app_id
client_id:
title: Client ID
description: The OAuth client ID. Fill it in the user inputs.
type: string
examples:
- "{{ config['client_id }}"
- "{{ config['credentials']['client_id }}"
client_secret_name:
title: Client Secret Property Name
description: The name of the property to use to refresh the `access_token`.
type: string
default: "client_secret"
examples:
- custom_app_secret
client_secret:
title: Client Secret
description: The OAuth client secret. Fill it in the user inputs.
type: string
examples:
- "{{ config['client_secret }}"
- "{{ config['credentials']['client_secret }}"
refresh_token_name:
title: Refresh Token Property Name
description: The name of the property to use to refresh the `access_token`.
type: string
default: "refresh_token"
examples:
- custom_app_refresh_value
refresh_token:
title: Refresh Token
description: Credential artifact used to get a new access token.
Expand Down Expand Up @@ -1094,6 +1115,13 @@ definitions:
default: "expires_in"
examples:
- expires_in
grant_type_name:
title: Grant Type Property Name
description: The name of the property to use to refresh the `access_token`.
type: string
default: "grant_type"
examples:
- custom_grant_type
grant_type:
title: Grant Type
description: Specifies the OAuth2 grant type. If set to refresh_token, the refresh_token needs to be provided as well. For client_credentials, only client id and secret are required. Other grant types are not officially supported.
Expand Down Expand Up @@ -2212,15 +2240,15 @@ definitions:
Pertains to the fields defined by the connector relating to the OAuth flow.
Interpolation capabilities:
- The variables placeholders are declared as `{my_var}`.
- The nested resolution variables like `{{my_nested_var}}` is allowed as well.
- The variables placeholders are declared as `{{my_var}}`.
- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.
- The allowed interpolation context is:
+ base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}
+ base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}
+ urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}
+ urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}
+ codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}
+ base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}
+ base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}
+ urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}
+ urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}
+ codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}
Examples:
- The TikTok Marketing DeclarativeOAuth spec:
Expand All @@ -2229,12 +2257,12 @@ definitions:
"type": "object",
"additionalProperties": false,
"properties": {
"consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",
"consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",
"access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",
"access_token_params": {
"{auth_code_key}": "{{auth_code_key}}",
"{client_id_key}": "{{client_id_key}}",
"{client_secret_key}": "{{client_secret_key}}"
"{{ auth_code_key }}": "{{ auth_code_value }}",
"{{ client_id_key }}": "{{ client_id_value }}",
"{{ client_secret_key }}": "{{ client_secret_value }}"
},
"access_token_headers": {
"Content-Type": "application/json",
Expand All @@ -2252,7 +2280,6 @@ definitions:
required:
- consent_url
- access_token_url
- extract_output
properties:
consent_url:
title: Consent URL
Expand All @@ -2261,8 +2288,8 @@ definitions:
The DeclarativeOAuth Specific string URL string template to initiate the authentication.
The placeholders are replaced during the processing to provide neccessary values.
examples:
- https://domain.host.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}
- https://endpoint.host.com/oauth2/authorize?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{scope_key}={urlEncoder:{{scope_key}}}&{state_key}={{state_key}}&subdomain={subdomain}
- https://domain.host.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{{{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}
- https://endpoint.host.com/oauth2/authorize?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{{{redirect_uri_value}} | urlEncoder}}&{{scope_key}}={{{{scope_value}} | urlEncoder}}&{{state_key}}={{state_value}}&subdomain={{subdomain}}
scope:
title: Scopes
type: string
Expand All @@ -2277,7 +2304,7 @@ definitions:
The DeclarativeOAuth Specific URL templated string to obtain the `access_token`, `refresh_token` etc.
The placeholders are replaced during the processing to provide neccessary values.
examples:
- https://auth.host.com/oauth2/token?{client_id_key}={{client_id_key}}&{client_secret_key}={{client_secret_key}}&{auth_code_key}={{auth_code_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}
- https://auth.host.com/oauth2/token?{{client_id_key}}={{client_id_value}}&{{client_secret_key}}={{client_secret_value}}&{{auth_code_key}}={{auth_code_value}}&{{redirect_uri_key}}={{{{redirect_uri_value}} | urlEncoder}}
access_token_headers:
title: Access Token Headers
type: object
Expand All @@ -2286,7 +2313,7 @@ definitions:
The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.
examples:
- {
"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}",
"Authorization": "Basic {{ {{ client_id_value }}:{{ client_secret_value }} | base64Encoder }}",
}
access_token_params:
title: Access Token Query Params (Json Encoded)
Expand All @@ -2297,9 +2324,9 @@ definitions:
When this property is provided, the query params will be encoded as `Json` and included in the outgoing API request.
examples:
- {
"{auth_code_key}": "{{auth_code_key}}",
"{client_id_key}": "{{client_id_key}}",
"{client_secret_key}": "{{client_secret_key}}",
"{{ auth_code_key }}": "{{ auth_code_value }}",
"{{ client_id_key }}": "{{ client_id_value }}",
"{{ client_secret_key }}": "{{ client_secret_value }}",
}
extract_output:
title: Extract Output
Expand Down Expand Up @@ -2867,6 +2894,7 @@ definitions:
parser:
anyOf:
- "$ref": "#/definitions/GzipParser"
- "$ref": "#/definitions/JsonParser"
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
# PARSERS
Expand All @@ -2883,6 +2911,20 @@ definitions:
anyOf:
- "$ref": "#/definitions/JsonLineParser"
- "$ref": "#/definitions/CsvParser"
- "$ref": "#/definitions/JsonParser"
JsonParser:
title: JsonParser
description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format.
type: object
required:
- type
properties:
type:
type: string
enum: [JsonParser]
encoding:
type: string
default: utf-8
JsonLineParser:
type: object
required:
Expand Down Expand Up @@ -2985,6 +3027,11 @@ definitions:
anyOf:
- "$ref": "#/definitions/CustomRequester"
- "$ref": "#/definitions/HttpRequester"
url_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to extract the url from polling response by the completed async job.
anyOf:
- "$ref": "#/definitions/CustomRequester"
- "$ref": "#/definitions/HttpRequester"
download_requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API to download the data provided by the completed async job.
anyOf:
Expand Down
43 changes: 43 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from io import BufferedIOBase, TextIOWrapper
from typing import Any, Generator, MutableMapping, Optional

import orjson
import requests

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.utils import AirbyteTracedException

logger = logging.getLogger("airbyte")

Expand Down Expand Up @@ -42,6 +45,46 @@ def parse(
yield from self.inner_parser.parse(gzipobj)


@dataclass
class JsonParser(Parser):
encoding: str = "utf-8"

def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]:
"""
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
"""
raw_data = data.read()
body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data)

if body_json is None:
raise AirbyteTracedException(
message="Response JSON data failed to be parsed. See logs for more information.",
internal_message=f"Response JSON data failed to be parsed.",
failure_type=FailureType.system_error,
)

if isinstance(body_json, list):
yield from body_json
else:
yield from [body_json]

def _parse_orjson(self, raw_data: bytes) -> Optional[Any]:
try:
return orjson.loads(raw_data.decode(self.encoding))
except Exception as exc:
logger.debug(
f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}"
)
return None

def _parse_json(self, raw_data: bytes) -> Optional[Any]:
try:
return json.loads(raw_data.decode(self.encoding))
except Exception as exc:
logger.error(f"Failed to parse JSON data using json library. {exc}")
return None


@dataclass
class JsonLineParser(Parser):
encoding: Optional[str] = "utf-8"
Expand Down
Loading

0 comments on commit 1da0d93

Please sign in to comment.