Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(airbyte-cdk): add custom decoder #48450

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1750,6 +1750,45 @@ definitions:
type:
type: string
enum: [XmlDecoder]
CustomDecoder:
title: Custom Decoder
description: Use this to implement custom decoder logic.
type: object
additionalProperties: true
required:
- type
- class_name
properties:
type:
type: string
enum: [CustomDecoder]
class_name:
title: Class Name
description: Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_<name>.<package>.<class_name>`.
type: string
additionalProperties: true
examples:
- "source_amazon_ads.components.GzipJsonlDecoder"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put this in the CDK? It feels like an interesting pattern that could be composed (for example, GZIP with JSONL or GZIP with CSV)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazon ads has some specific decompress with utf :

response = self._send_http_request(url=url, profile_id=profile_id, is_download_report=True)
response.raise_for_status()
raw_string = decompress(response.content).decode("utf")
return json.loads(raw_string)

I can add a GzipJsonDecoder with optional encoding field (I guess it would be utf-8 by default). WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me, thanks @artem1205 !

$parameters:
type: object
additionalProperties: true
GzipJsonDecoder:
title: GzipJson Decoder
description: Use this if the response is Gzip compressed Json.
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [GzipJsonDecoder]
encoding:
type: string
default: utf-8
$parameters:
type: object
additionalProperties: true
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down Expand Up @@ -2404,10 +2443,12 @@ definitions:
title: Decoder
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -2520,10 +2561,12 @@ definitions:
title: Decoder
description: Component decoding the response so records can be extracted.
anyOf:
- "$ref": "#/definitions/CustomDecoder"
- "$ref": "#/definitions/JsonDecoder"
- "$ref": "#/definitions/JsonlDecoder"
- "$ref": "#/definitions/IterableDecoder"
- "$ref": "#/definitions/XmlDecoder"
- "$ref": "#/definitions/GzipJsonDecoder"
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#

from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder, IterableDecoder, GzipJsonDecoder
from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import PaginationDecoderDecorator
from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder

__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]
__all__ = ["Decoder", "JsonDecoder", "JsonlDecoder", "IterableDecoder", "GzipJsonDecoder", "NoopDecoder", "PaginationDecoderDecorator", "XmlDecoder"]
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging
from dataclasses import InitVar, dataclass
from gzip import decompress
from typing import Any, Generator, Mapping

import requests
Expand All @@ -30,16 +31,20 @@ def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], No
"""
try:
body_json = response.json()
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
yield from body_json
yield from self.parse_body_json(body_json)
except requests.exceptions.JSONDecodeError:
logger.warning(f"Response cannot be parsed into json: {response.status_code=}, {response.text=}")
yield {}

@staticmethod
def parse_body_json(body_json: Mapping[str, Any] | list) -> Generator[Mapping[str, Any], None, None]:
if not isinstance(body_json, list):
body_json = [body_json]
if len(body_json) == 0:
yield {}
else:
yield from body_json


@dataclass
class IterableDecoder(Decoder):
Expand Down Expand Up @@ -73,3 +78,12 @@ def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], No
# https://github.com/airbytehq/airbyte-internal-issues/issues/8436
for record in response.iter_lines():
yield orjson.loads(record)


@dataclass
class GzipJsonDecoder(JsonDecoder):
encoding: str = "utf-8"

def decode(self, response: requests.Response) -> Generator[Mapping[str, Any], None, None]:
raw_string = decompress(response.content).decode(encoding=self.encoding)
yield from self.parse_body_json(orjson.loads(raw_string))
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,29 @@ class XmlDecoder(BaseModel):
type: Literal['XmlDecoder']


class CustomDecoder(BaseModel):
class Config:
extra = Extra.allow

type: Literal['CustomDecoder']
class_name: str = Field(
...,
description='Fully-qualified name of the class that will be implementing the custom decoding. Has to be a sub class of Decoder. The format is `source_<name>.<package>.<class_name>`.',
examples=['source_amazon_ads.components.GzipJsonlDecoder'],
title='Class Name',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class GzipJsonDecoder(BaseModel):
class Config:
extra = Extra.allow

type: Literal['GzipJsonDecoder']
encoding: Optional[str] = 'utf-8'
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class MinMaxDatetime(BaseModel):
type: Literal['MinMaxDatetime']
datetime: str = Field(
Expand Down Expand Up @@ -1634,12 +1657,19 @@ class SimpleRetriever(BaseModel):
description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.',
title='Partition Router',
)
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = (
Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
decoder: Optional[
Union[
CustomDecoder,
JsonDecoder,
JsonlDecoder,
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
]
] = Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')

Expand Down Expand Up @@ -1700,12 +1730,19 @@ class AsyncRetriever(BaseModel):
description='PartitionRouter component that describes how to partition the stream, enabling incremental syncs and checkpointing.',
title='Partition Router',
)
decoder: Optional[Union[JsonDecoder, JsonlDecoder, IterableDecoder, XmlDecoder]] = (
Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
decoder: Optional[
Union[
CustomDecoder,
JsonDecoder,
JsonlDecoder,
IterableDecoder,
XmlDecoder,
GzipJsonDecoder,
]
] = Field(
None,
description='Component decoding the response so records can be extracted.',
title='Decoder',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
from airbyte_cdk.sources.declarative.decoders import (
Decoder,
GzipJsonDecoder,
IterableDecoder,
JsonDecoder,
JsonlDecoder,
Expand Down Expand Up @@ -72,6 +73,7 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CursorPagination as CursorPaginationModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomAuthenticator as CustomAuthenticatorModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomBackoffStrategy as CustomBackoffStrategyModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomDecoder as CustomDecoderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomErrorHandler as CustomErrorHandlerModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomIncrementalSync as CustomIncrementalSyncModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import CustomPaginationStrategy as CustomPaginationStrategyModel
Expand All @@ -90,6 +92,7 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import GzipJsonDecoder as GzipJsonDecoderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import HttpRequester as HttpRequesterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import HttpResponseFilter as HttpResponseFilterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import InlineSchemaLoader as InlineSchemaLoaderModel
Expand Down Expand Up @@ -222,6 +225,7 @@ def _init_mappings(self) -> None:
CursorPaginationModel: self.create_cursor_pagination,
CustomAuthenticatorModel: self.create_custom_component,
CustomBackoffStrategyModel: self.create_custom_component,
CustomDecoderModel: self.create_custom_component,
CustomErrorHandlerModel: self.create_custom_component,
CustomIncrementalSyncModel: self.create_custom_component,
CustomRecordExtractorModel: self.create_custom_component,
Expand All @@ -245,6 +249,7 @@ def _init_mappings(self) -> None:
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
Expand Down Expand Up @@ -1126,6 +1131,10 @@ def create_iterable_decoder(model: IterableDecoderModel, config: Config, **kwarg
def create_xml_decoder(model: XmlDecoderModel, config: Config, **kwargs: Any) -> XmlDecoder:
return XmlDecoder(parameters={})

@staticmethod
def create_gzipjson_decoder(model: GzipJsonDecoderModel, config: Config, **kwargs: Any) -> GzipJsonDecoder:
return GzipJsonDecoder(parameters={}, encoding=model.encoding)

@staticmethod
def create_json_file_schema_loader(model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any) -> JsonFileSchemaLoader:
return JsonFileSchemaLoader(file_path=model.file_path or "", config=config, parameters=model.parameters or {})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import gzip
import json
import os

Expand All @@ -11,6 +12,7 @@
from airbyte_cdk.sources.declarative.decoders.json_decoder import JsonDecoder, JsonlDecoder
from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ModelToComponentFactory
from sources.declarative.decoders import GzipJsonDecoder


@pytest.mark.parametrize(
Expand Down Expand Up @@ -109,3 +111,75 @@ def get_body():
counter += 1

assert counter == lines_in_response * len(stream_slices)

@pytest.mark.parametrize(
"encoding",
[
"utf-8",
"utf",
],
ids=["utf-8", "utf"],
)
def test_gzipjson_decoder(requests_mock, encoding):
response_to_compress = json.dumps([
{
"campaignId": 214078428,
"campaignName": "sample-campaign-name-214078428",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"advertisedAsin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 44504582,
"campaignName": "sample-campaign-name-44504582",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"advertisedAsin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 509144838,
"campaignName": "sample-campaign-name-509144838",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"advertisedAsin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 231712082,
"campaignName": "sample-campaign-name-231712082",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"advertisedAsin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
},
{
"campaignId": 895306040,
"campaignName": "sample-campaign-name-895306040",
"adGroupId": "6490134",
"adId": "665320125",
"targetId": "791320341",
"asin": "G000PSH142",
"advertisedAsin": "G000PSH142",
"keywordBid": "511234974",
"keywordId": "965783021"
}
])
body = gzip.compress(response_to_compress.encode(encoding))

requests_mock.register_uri("GET", "https://airbyte.io/", content=body)
response = requests.get("https://airbyte.io/")
assert len(list(GzipJsonDecoder(parameters={}, encoding=encoding).decode(response))) == 5
Loading