diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 3fcbbf34672dc..3d378a28569b0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1750,6 +1750,45 @@ definitions: type: type: string enum: [XmlDecoder] + CustomDecoder: + title: Custom Decoder + description: Use this to implement custom decoder logic. + type: object + additionalProperties: true + required: + - type + - class_name + properties: + type: + type: string + enum: [CustomDecoder] + class_name: + title: Class Name + description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_..`. + type: string + additionalProperties: true + examples: + - "source_amazon_ads.components.GzipJsonlDecoder" + $parameters: + type: object + additionalProperties: true + GzipJsonDecoder: + title: GzipJson Decoder + description: Use this if the response is Gzip compressed Json. + type: object + additionalProperties: true + required: + - type + properties: + type: + type: string + enum: [GzipJsonDecoder] + encoding: + type: string + default: utf-8 + $parameters: + type: object + additionalProperties: true ListPartitionRouter: title: List Partition Router description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests. @@ -2404,10 +2443,12 @@ definitions: title: Decoder description: Component decoding the response so records can be extracted. anyOf: + - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/GzipJsonDecoder" $parameters: type: object additionalProperties: true @@ -2520,10 +2561,12 @@ definitions: title: Decoder description: Component decoding the response so records can be extracted. anyOf: + - "$ref": "#/definitions/CustomDecoder" - "$ref": "#/definitions/JsonDecoder" - "$ref": "#/definitions/JsonlDecoder" - "$ref": "#/definitions/IterableDecoder" - "$ref": "#/definitions/XmlDecoder" + - "$ref": "#/definitions/GzipJsonDecoder" $parameters: type: object additionalProperties: true diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/__init__.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/__init__.py index b67561e989cee..7452fe99883c6 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/__init__.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/__init__.py @@ -3,9 +3,9 @@ # from airbyte_cdk.sources.declarative.decoders.decoder import Decoder -from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder +from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder, GzipJsonDecoder from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder -__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"] +__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"] diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py index b2c25c3370fed..b327577cca268 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/decoders/json_decoder.py @@ -4,6 +4,7 @@ import logging from dataclasses import InitVar, dataclass +from gzip import decompress from typing import Any, Generator, Mapping import requests @@ -30,16 +31,20 @@ def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], No """ try: body_json = response.json() - if not isinstance(body_json, list): - body_json = [body_json] - if len(body_json) == 0: - yield {} - else: - yield from body_json + yield from self.parse_body_json(body_json) except requests.exceptions.JSONDecodeError: logger.warning(f"Response cannot be parsed into json: {response.status_code=}, {response.text=}") yield {} + @staticmethod + def parse_body_json(body_json: Mapping[str, Any] | list) -> Generator[Mapping[str, Any], None, None]: + if not isinstance(body_json, list): + body_json = [body_json] + if len(body_json) == 0: + yield {} + else: + yield from body_json + @dataclass class IterableDecoder(Decoder): @@ -73,3 +78,12 @@ def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], No # https://github.com/airbytehq/airbyte-internal-issues/issues/8436 for record in response.iter_lines(): yield orjson.loads(record) + + +@dataclass +class GzipJsonDecoder(JsonDecoder): + encoding: str = "utf-8" + + def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]: + raw_string = decompress(response.content).decode(encoding=self.encoding) + yield from self.parse_body_json(orjson.loads(raw_string)) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index eeb773b2522f6..5ebeaa80cae9f 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -689,6 +689,29 @@ class XmlDecoder(BaseModel): type: Literal['XmlDecoder'] +class CustomDecoder(BaseModel): + class Config: + extra = Extra.allow + + type: Literal['CustomDecoder'] + class_name: str = Field( + ..., + description='Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_..`.', + examples=['source_amazon_ads.components.GzipJsonlDecoder'], + title='Class Name', + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') + + +class GzipJsonDecoder(BaseModel): + class Config: + extra = Extra.allow + + type: Literal['GzipJsonDecoder'] + encoding: Optional[str] = 'utf-8' + parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') + + class MinMaxDatetime(BaseModel): type: Literal['MinMaxDatetime'] datetime: str = Field( @@ -1634,12 +1657,19 @@ class SimpleRetriever(BaseModel): description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.', title='Partition Router', ) - decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = ( - Field( - None, - description='Component decoding the response so records can be extracted.', - title='Decoder', - ) + decoder: Optional[ + Union[ + CustomDecoder, + JsonDecoder, + JsonlDecoder, + IterableDecoder, + XmlDecoder, + GzipJsonDecoder, + ] + ] = Field( + None, + description='Component decoding the response so records can be extracted.', + title='Decoder', ) parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') @@ -1700,12 +1730,19 @@ class AsyncRetriever(BaseModel): description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.', title='Partition Router', ) - decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = ( - Field( - None, - description='Component decoding the response so records can be extracted.', - title='Decoder', - ) + decoder: Optional[ + Union[ + CustomDecoder, + JsonDecoder, + JsonlDecoder, + IterableDecoder, + XmlDecoder, + GzipJsonDecoder, + ] + ] = Field( + None, + description='Component decoding the response so records can be extracted.', + title='Decoder', ) parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters') diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 01b76dcdb4923..096b4b697cc56 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -35,6 +35,7 @@ from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import ( Decoder, + GzipJsonDecoder, IterableDecoder, JsonDecoder, JsonlDecoder, @@ -72,6 +73,7 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import CursorPagination as CursorPaginationModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomAuthenticator as CustomAuthenticatorModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomBackoffStrategy as CustomBackoffStrategyModel +from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomDecoder as CustomDecoderModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomErrorHandler as CustomErrorHandlerModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomIncrementalSync as CustomIncrementalSyncModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomPaginationStrategy as CustomPaginationStrategyModel @@ -90,6 +92,7 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import GzipJsonDecoder as GzipJsonDecoderModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import HttpRequester as HttpRequesterModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import HttpResponseFilter as HttpResponseFilterModel from airbyte_cdk.sources.declarative.models.declarative_component_schema import InlineSchemaLoader as InlineSchemaLoaderModel @@ -222,6 +225,7 @@ def _init_mappings(self) -> None: CursorPaginationModel: self.create_cursor_pagination, CustomAuthenticatorModel: self.create_custom_component, CustomBackoffStrategyModel: self.create_custom_component, + CustomDecoderModel: self.create_custom_component, CustomErrorHandlerModel: self.create_custom_component, CustomIncrementalSyncModel: self.create_custom_component, CustomRecordExtractorModel: self.create_custom_component, @@ -245,6 +249,7 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + GzipJsonDecoderModel: self.create_gzipjson_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, IterableDecoderModel: self.create_iterable_decoder, XmlDecoderModel: self.create_xml_decoder, @@ -1126,6 +1131,10 @@ def create_iterable_decoder(model: IterableDecoderModel, config: Config, **kwarg def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder: return XmlDecoder(parameters={}) + @staticmethod + def create_gzipjson_decoder(model: GzipJsonDecoderModel, config: Config, **kwargs: Any) -> GzipJsonDecoder: + return GzipJsonDecoder(parameters={}, encoding=model.encoding) + @staticmethod def create_json_file_schema_loader(model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any) -> JsonFileSchemaLoader: return JsonFileSchemaLoader(file_path=model.file_path or "", config=config, parameters=model.parameters or {}) diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/decoders/test_json_decoder.py b/airbyte-cdk/python/unit_tests/sources/declarative/decoders/test_json_decoder.py index 1b9a552d6ed5a..70e4cffe97c88 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/decoders/test_json_decoder.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/decoders/test_json_decoder.py @@ -1,6 +1,7 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import gzip import json import os @@ -11,6 +12,7 @@ from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory +from sources.declarative.decoders import GzipJsonDecoder @pytest.mark.parametrize( @@ -109,3 +111,75 @@ def get_body(): counter += 1 assert counter == lines_in_response * len(stream_slices) + +@pytest.mark.parametrize( + "encoding", + [ + "utf-8", + "utf", + ], + ids=["utf-8", "utf"], +) +def test_gzipjson_decoder(requests_mock, encoding): + response_to_compress = json.dumps([ + { + "campaignId": 214078428, + "campaignName": "sample-campaign-name-214078428", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" + }, + { + "campaignId": 44504582, + "campaignName": "sample-campaign-name-44504582", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" + }, + { + "campaignId": 509144838, + "campaignName": "sample-campaign-name-509144838", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" + }, + { + "campaignId": 231712082, + "campaignName": "sample-campaign-name-231712082", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" + }, + { + "campaignId": 895306040, + "campaignName": "sample-campaign-name-895306040", + "adGroupId": "6490134", + "adId": "665320125", + "targetId": "791320341", + "asin": "G000PSH142", + "advertisedAsin": "G000PSH142", + "keywordBid": "511234974", + "keywordId": "965783021" + } + ]) + body = gzip.compress(response_to_compress.encode(encoding)) + + requests_mock.register_uri("GET", "https://airbyte.io/", content=body) + response = requests.get("https://airbyte.io/") + assert len(list(GzipJsonDecoder(parameters={}, encoding=encoding).decode(response))) == 5