Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code): added keys replace transformation #183

Merged
45 changes: 45 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/KeysReplace"
state_migrations:
title: State Migrations
description: Array of state migrations to be applied on the input state
Expand Down Expand Up @@ -1785,6 +1786,7 @@ definitions:
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/KeysReplace"
schema_type_identifier:
"$ref": "#/definitions/SchemaTypeIdentifier"
$parameters:
Expand Down Expand Up @@ -1883,6 +1885,49 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeysReplace:
title: Keys Replace
description: A transformation that replaces symbols in keys.
type: object
required:
- type
- old
- new
properties:
type:
type: string
enum: [KeysReplace]
old:
type: string
title: Old value
description: Old value to replace.
examples:
- " "
- "{{ record.id }}"
- "{{ config['id'] }}"
- "{{ stream_slice['id'] }}"
interpolation_context:
- config
- record
- stream_state
- stream_slice
new:
type: string
title: New value
description: New value to set.
examples:
- "_"
- "{{ record.id }}"
- "{{ config['id'] }}"
- "{{ stream_slice['id'] }}"
interpolation_context:
- config
- record
- stream_state
- stream_slice
$parameters:
type: object
additionalProperties: true
IterableDecoder:
title: Iterable Decoder
description: Use this if the response consists of strings separated by new lines (`\n`). The Decoder will wrap each row into a JSON object with the `record` key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,23 @@ class KeysToSnakeCase(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeysReplace(BaseModel):
type: Literal["KeysReplace"]
old: str = Field(
...,
description="Old value to replace.",
examples=[" ", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
title="Old value",
)
new: str = Field(
...,
description="New value to set.",
examples=["_", "{{ record.id }}", "{{ config['id'] }}", "{{ stream_slice['id'] }}"],
title="New value",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FlattenFields(BaseModel):
type: Literal["FlattenFields"]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
Expand Down Expand Up @@ -1701,6 +1718,7 @@ class Config:
KeysToLower,
KeysToSnakeCase,
FlattenFields,
KeysReplace,
]
]
] = Field(
Expand Down Expand Up @@ -1875,6 +1893,7 @@ class DynamicSchemaLoader(BaseModel):
KeysToLower,
KeysToSnakeCase,
FlattenFields,
KeysReplace,
]
]
] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtPayload as JwtPayloadModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysReplace as KeysReplaceModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToLower as KeysToLowerModel,
)
Expand Down Expand Up @@ -417,6 +420,9 @@
from airbyte_cdk.sources.declarative.transformations.flatten_fields import (
FlattenFields,
)
from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
KeysReplaceTransformation,
)
from airbyte_cdk.sources.declarative.transformations.keys_to_lower_transformation import (
KeysToLowerTransformation,
)
Expand Down Expand Up @@ -509,6 +515,7 @@ def _init_mappings(self) -> None:
GzipParserModel: self.create_gzip_parser,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
KeysReplaceModel: self.create_keys_replace_transformation,
FlattenFieldsModel: self.create_flatten_fields,
IterableDecoderModel: self.create_iterable_decoder,
XmlDecoderModel: self.create_xml_decoder,
Expand Down Expand Up @@ -630,6 +637,13 @@ def create_keys_to_snake_transformation(
) -> KeysToSnakeCaseTransformation:
return KeysToSnakeCaseTransformation()

def create_keys_replace_transformation(
self, model: KeysReplaceModel, config: Config, **kwargs: Any
) -> KeysReplaceTransformation:
return KeysReplaceTransformation(
old=model.old, new=model.new, parameters=model.parameters or {}
)

def create_flatten_fields(
self, model: FlattenFieldsModel, config: Config, **kwargs: Any
) -> FlattenFields:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass
from typing import Any, Dict, Mapping, Optional

from airbyte_cdk import InterpolatedString
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState


@dataclass
class KeysReplaceTransformation(RecordTransformation):
"""
Transformation that applies keys names replacement.

Example usage:
- type: KeysReplace
old: " "
new: "_"
Result:
from: {"created time": ..., "customer id": ..., "user id": ...}
to: {"created_time": ..., "customer_id": ..., "user_id": ...}
"""

old: str
new: str
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._old = InterpolatedString.create(self.old, parameters=parameters)
self._new = InterpolatedString.create(self.new, parameters=parameters)

def transform(
self,
record: Dict[str, Any],
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> None:
if config is None:
config = {}

kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
old_key = str(self._old.eval(config, **kwargs))
new_key = str(self._new.eval(config, **kwargs))

def _transform(data: Dict[str, Any]) -> Dict[str, Any]:
result = {}
for key, value in data.items():
updated_key = key.replace(old_key, new_key)
if isinstance(value, dict):
result[updated_key] = _transform(value)
else:
result[updated_key] = value
return result

transformed_record = _transform(record)
record.clear()
record.update(transformed_record)
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import pytest

from airbyte_cdk.sources.declarative.transformations.keys_replace_transformation import (
KeysReplaceTransformation,
)

_ANY_VALUE = -1


@pytest.mark.parametrize(
[
"input_record",
"config",
"stream_state",
"stream_slice",
"keys_replace_config",
"expected_record",
],
[
pytest.param(
{"date time": _ANY_VALUE, "customer id": _ANY_VALUE},
{},
{},
{},
{"old": " ", "new": "_"},
{"date_time": _ANY_VALUE, "customer_id": _ANY_VALUE},
id="simple keys replace config",
),
pytest.param(
{
"customer_id": 111111,
"customer_name": "MainCustomer",
"field_1_111111": _ANY_VALUE,
"field_2_111111": _ANY_VALUE,
},
{},
{},
{},
{"old": '{{ record["customer_id"] }}', "new": '{{ record["customer_name"] }}'},
{
"customer_id": 111111,
"customer_name": "MainCustomer",
"field_1_MainCustomer": _ANY_VALUE,
"field_2_MainCustomer": _ANY_VALUE,
},
id="keys replace config uses values from record",
),
pytest.param(
{"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE},
{},
{},
{"customer_name": "MainCustomer"},
{"old": '{{ record["customer_id"] }}', "new": '{{ stream_slice["customer_name"] }}'},
{
"customer_id": 111111,
"field_1_MainCustomer": _ANY_VALUE,
"field_2_MainCustomer": _ANY_VALUE,
},
id="keys replace config uses values from slice",
),
pytest.param(
{"customer_id": 111111, "field_1_111111": _ANY_VALUE, "field_2_111111": _ANY_VALUE},
{"customer_name": "MainCustomer"},
{},
{},
{"old": '{{ record["customer_id"] }}', "new": '{{ config["customer_name"] }}'},
{
"customer_id": 111111,
"field_1_MainCustomer": _ANY_VALUE,
"field_2_MainCustomer": _ANY_VALUE,
},
id="keys replace config uses values from config",
),
pytest.param(
{
"date time": _ANY_VALUE,
"user id": _ANY_VALUE,
"customer": {
"customer name": _ANY_VALUE,
"customer id": _ANY_VALUE,
"contact info": {"email": _ANY_VALUE, "phone number": _ANY_VALUE},
},
},
{},
{},
{},
{"old": " ", "new": "_"},
{
"customer": {
"contact_info": {"email": _ANY_VALUE, "phone_number": _ANY_VALUE},
"customer_id": _ANY_VALUE,
"customer_name": _ANY_VALUE,
},
"date_time": _ANY_VALUE,
"user_id": _ANY_VALUE,
},
id="simple keys replace config with nested fields in record",
),
],
)
def test_transform(
input_record, config, stream_state, stream_slice, keys_replace_config, expected_record
):
KeysReplaceTransformation(
old=keys_replace_config["old"], new=keys_replace_config["new"], parameters={}
).transform(
record=input_record, config=config, stream_state=stream_state, stream_slice=stream_slice
)
assert input_record == expected_record
Loading