From 7b50f09a6166a01ef2641fc2115b6cb15123f9fb Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 20 Feb 2024 10:43:38 -0800 Subject: [PATCH 01/25] moved deserialize_json and added serialize_json to _util --- singer_sdk/_singerlib/messages.py | 21 +--------- singer_sdk/helpers/_util.py | 64 +++++++++++++++++++++++++++++++ singer_sdk/io_base.py | 26 +------------ tests/core/test_io.py | 8 ++-- 4 files changed, 71 insertions(+), 48 deletions(-) diff --git a/singer_sdk/_singerlib/messages.py b/singer_sdk/_singerlib/messages.py index f9a93b76b..4f45acb69 100644 --- a/singer_sdk/_singerlib/messages.py +++ b/singer_sdk/_singerlib/messages.py @@ -8,7 +8,7 @@ from dataclasses import asdict, dataclass, field from datetime import datetime, timezone -import simplejson as json +from singer_sdk.helpers._util import serialize_json if sys.version_info < (3, 11): from backports.datetime_fromisoformat import MonkeyPatch @@ -26,18 +26,6 @@ class SingerMessageType(str, enum.Enum): BATCH = "BATCH" -def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 - """Default JSON encoder. - - Args: - obj: The object to encode. - - Returns: - The encoded object. - """ - return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj) - - def exclude_null_dict(pairs: list[tuple[str, t.Any]]) -> dict[str, t.Any]: """Exclude null values from a dictionary. @@ -226,12 +214,7 @@ def format_message(message: Message) -> str: Returns: The formatted message. """ - return json.dumps( - message.to_dict(), - use_decimal=True, - default=_default_encoding, - separators=(",", ":"), - ) + return serialize_json(message.to_dict()) def write_message(message: Message) -> None: diff --git a/singer_sdk/helpers/_util.py b/singer_sdk/helpers/_util.py index d0079c40d..e4bad57a7 100644 --- a/singer_sdk/helpers/_util.py +++ b/singer_sdk/helpers/_util.py @@ -2,11 +2,75 @@ from __future__ import annotations +import decimal import json +import logging +import sys import typing as t +from datetime import datetime from pathlib import Path, PurePath import pendulum +import simplejson as simjson + +if sys.version_info < (3, 11): + from backports.datetime_fromisoformat import MonkeyPatch + + MonkeyPatch.patch_fromisoformat() + + +logger = logging.getLogger(__name__) + + +def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 + """Default JSON encoder. + + Args: + obj: The object to encode. + + Returns: + The encoded object. + """ + return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj) + + +def deserialize_json(line: str) -> dict: + """Deserialize a line of json. + + Args: + line: A single line of json. + + Returns: + A dictionary of the deserialized json. + + Raises: + json.decoder.JSONDecodeError: raised if any lines are not valid json + """ + try: + return json.loads( # type: ignore[no-any-return] + line, + parse_float=decimal.Decimal, + ) + except json.decoder.JSONDecodeError as exc: + logger.error("Unable to parse:\n%s", line, exc_info=exc) + raise + + +def serialize_json(line_dict: dict) -> str: + """Serialize a dictionary into a line of json. + + Args: + line_dict: A Python dict. + + Returns: + A string of serialized json. + """ + return simjson.dumps( + line_dict, + use_decimal=True, + default=_default_encoding, + separators=(",", ":"), + ) def read_json_file(path: PurePath | str) -> dict[str, t.Any]: diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index 447dbd1b9..4c26172c3 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -3,8 +3,6 @@ from __future__ import annotations import abc -import decimal -import json import logging import sys import typing as t @@ -13,6 +11,7 @@ from singer_sdk._singerlib.messages import Message, SingerMessageType from singer_sdk._singerlib.messages import format_message as singer_format_message from singer_sdk._singerlib.messages import write_message as singer_write_message +from singer_sdk.helpers._util import deserialize_json logger = logging.getLogger(__name__) @@ -51,27 +50,6 @@ def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" raise Exception(msg) # TODO: Raise a more specific exception - def deserialize_json(self, line: str) -> dict: - """Deserialize a line of json. - - Args: - line: A single line of json. - - Returns: - A dictionary of the deserialized json. - - Raises: - json.decoder.JSONDecodeError: raised if any lines are not valid json - """ - try: - return json.loads( # type: ignore[no-any-return] - line, - parse_float=decimal.Decimal, - ) - except json.decoder.JSONDecodeError as exc: - logger.error("Unable to parse:\n%s", line, exc_info=exc) - raise - def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: """Internal method to process jsonl lines from a Singer tap. @@ -83,7 +61,7 @@ def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: """ stats: dict[str, int] = defaultdict(int) for line in file_input: - line_dict = self.deserialize_json(line) + line_dict = deserialize_json(line) self._assert_line_requires(line_dict, requires={"type"}) record_type: SingerMessageType = line_dict["type"] diff --git a/tests/core/test_io.py b/tests/core/test_io.py index 0fcce614b..ecc52fac1 100644 --- a/tests/core/test_io.py +++ b/tests/core/test_io.py @@ -10,6 +10,7 @@ import pytest from singer_sdk._singerlib import RecordMessage +from singer_sdk.helpers._util import deserialize_json from singer_sdk.io_base import SingerReader, SingerWriter @@ -52,9 +53,8 @@ def _process_state_message(self, message_dict: dict) -> None: ], ) def test_deserialize(line, expected, exception): - reader = DummyReader() with exception: - assert reader.deserialize_json(line) == expected + assert deserialize_json(line) == expected # Benchmark Tests @@ -104,10 +104,8 @@ def test_bench_deserialize_json(benchmark, bench_encoded_record): """Run benchmark for Sink._validator method validate.""" number_of_runs = 1000 - reader = DummyReader() - def run_deserialize_json(): for record in itertools.repeat(bench_encoded_record, number_of_runs): - reader.deserialize_json(record) + deserialize_json(record) benchmark(run_deserialize_json) From e73908391ec7b2930d17e0568403fa5a3bbfd82b Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 20 Feb 2024 14:30:26 -0800 Subject: [PATCH 02/25] add serialize_json to _flatten_record --- singer_sdk/helpers/_flattening.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/singer_sdk/helpers/_flattening.py b/singer_sdk/helpers/_flattening.py index 2a3e194d0..3a0f63408 100644 --- a/singer_sdk/helpers/_flattening.py +++ b/singer_sdk/helpers/_flattening.py @@ -9,7 +9,8 @@ from copy import deepcopy import inflection -import simplejson as json + +from singer_sdk.helpers._util import serialize_json DEFAULT_FLATTENING_SEPARATOR = "__" @@ -437,7 +438,7 @@ def _flatten_record( items.append( ( new_key, - json.dumps(v, use_decimal=True, default=str) + serialize_json(v) if _should_jsondump_value(k, v, flattened_schema) else v, ), From 52f8f05931e1cadc61c55f7b97894fa111c76f81 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 20 Feb 2024 14:36:59 -0800 Subject: [PATCH 03/25] updated test_flattening_record --- tests/core/test_flattening.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/core/test_flattening.py b/tests/core/test_flattening.py index 73169eab3..1e0466986 100644 --- a/tests/core/test_flattening.py +++ b/tests/core/test_flattening.py @@ -20,7 +20,7 @@ { "key_1": 1, "key_2__key_3": "value", - "key_2__key_4": '{"key_5": 1, "key_6": ["a", "b"]}', + "key_2__key_4": '{"key_5":1,"key_6":["a","b"]}', }, id="flattened schema limiting the max level", ), @@ -38,7 +38,7 @@ "key_1": 1, "key_2__key_3": "value", "key_2__key_4__key_5": 1, - "key_2__key_4__key_6": '["a", "b"]', + "key_2__key_4__key_6": '["a","b"]', }, id="flattened schema not limiting the max level", ), @@ -55,7 +55,7 @@ { "key_1": 1, "key_2__key_3": "value", - "key_2__key_4": '{"key_5": 1, "key_6": ["a", "b"]}', + "key_2__key_4": '{"key_5":1,"key_6":["a","b"]}', }, id="max level limiting flattened schema", ), From 209fc7d6e44246372310a05ca3d611fe0548cf69 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 20 Feb 2024 14:41:51 -0800 Subject: [PATCH 04/25] update flattening related snapshots --- tests/snapshots/mapped_stream/flatten_all.jsonl | 6 +++--- tests/snapshots/mapped_stream/flatten_depth_1.jsonl | 6 +++--- tests/snapshots/mapped_stream/map_and_flatten.jsonl | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/snapshots/mapped_stream/flatten_all.jsonl b/tests/snapshots/mapped_stream/flatten_all.jsonl index 21504a38f..79f981ac5 100644 --- a/tests/snapshots/mapped_stream/flatten_all.jsonl +++ b/tests/snapshots/mapped_stream/flatten_all.jsonl @@ -1,6 +1,6 @@ {"type":"STATE","value":{}} {"type":"SCHEMA","stream":"mystream","schema":{"properties":{"email":{"type":["string","null"]},"count":{"type":["integer","null"]},"user__id":{"type":["integer","null"]},"user__sub__num":{"type":["integer","null"]},"user__sub__custom_obj":{"type":["string","null"]},"user__some_numbers":{"type":["string","null"]}},"type":"object"},"key_properties":[]} -{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14, 2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32, 1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414, 1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14,2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32,1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414,1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"} {"type":"STATE","value":{"bookmarks":{"mystream":{}}}} diff --git a/tests/snapshots/mapped_stream/flatten_depth_1.jsonl b/tests/snapshots/mapped_stream/flatten_depth_1.jsonl index 317008dd8..4dd18f86d 100644 --- a/tests/snapshots/mapped_stream/flatten_depth_1.jsonl +++ b/tests/snapshots/mapped_stream/flatten_depth_1.jsonl @@ -1,6 +1,6 @@ {"type":"STATE","value":{}} {"type":"SCHEMA","stream":"mystream","schema":{"properties":{"email":{"type":["string","null"]},"count":{"type":["integer","null"]},"user__id":{"type":["integer","null"]},"user__sub":{"type":["string","null"]},"user__some_numbers":{"type":["string","null"]}},"type":"object"},"key_properties":[]} -{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub":"{\"num\": 1, \"custom_obj\": \"obj-hello\"}","user__some_numbers":"[3.14, 2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub":"{\"num\": 2, \"custom_obj\": \"obj-world\"}","user__some_numbers":"[10.32, 1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub":"{\"num\": 3, \"custom_obj\": \"obj-hello\"}","user__some_numbers":"[1.414, 1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub":"{\"num\":1,\"custom_obj\":\"obj-hello\"}","user__some_numbers":"[3.14,2.718]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub":"{\"num\":2,\"custom_obj\":\"obj-world\"}","user__some_numbers":"[10.32,1.618]"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub":"{\"num\":3,\"custom_obj\":\"obj-hello\"}","user__some_numbers":"[1.414,1.732]"},"time_extracted":"2022-01-01T00:00:00+00:00"} {"type":"STATE","value":{"bookmarks":{"mystream":{}}}} diff --git a/tests/snapshots/mapped_stream/map_and_flatten.jsonl b/tests/snapshots/mapped_stream/map_and_flatten.jsonl index 89397a046..5bc3b7f42 100644 --- a/tests/snapshots/mapped_stream/map_and_flatten.jsonl +++ b/tests/snapshots/mapped_stream/map_and_flatten.jsonl @@ -1,6 +1,6 @@ {"type":"STATE","value":{}} {"type":"SCHEMA","stream":"mystream","schema":{"properties":{"email":{"type":["string","null"]},"count":{"type":["integer","null"]},"user__id":{"type":["integer","null"]},"user__sub__num":{"type":["integer","null"]},"user__sub__custom_obj":{"type":["string","null"]},"user__some_numbers":{"type":["string","null"]},"email_hash":{"type":["string","null"]}},"type":"object"},"key_properties":["email_hash"]} -{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14, 2.718]","email_hash":"c160f8cc69a4f0bf2b0362752353d060"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32, 1.618]","email_hash":"4b9bb80620f03eb3719e0a061c14283d"},"time_extracted":"2022-01-01T00:00:00+00:00"} -{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414, 1.732]","email_hash":"426b189df1e2f359efe6ee90f2d2030f"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"alice@example.com","count":21,"user__id":1,"user__sub__num":1,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[3.14,2.718]","email_hash":"c160f8cc69a4f0bf2b0362752353d060"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"bob@example.com","count":13,"user__id":2,"user__sub__num":2,"user__sub__custom_obj":"obj-world","user__some_numbers":"[10.32,1.618]","email_hash":"4b9bb80620f03eb3719e0a061c14283d"},"time_extracted":"2022-01-01T00:00:00+00:00"} +{"type":"RECORD","stream":"mystream","record":{"email":"charlie@example.com","count":19,"user__id":3,"user__sub__num":3,"user__sub__custom_obj":"obj-hello","user__some_numbers":"[1.414,1.732]","email_hash":"426b189df1e2f359efe6ee90f2d2030f"},"time_extracted":"2022-01-01T00:00:00+00:00"} {"type":"STATE","value":{"bookmarks":{"mystream":{}}}} From c8d594bd031a6dcdbcc199c8bc2fcf5b841985dd Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 20 Feb 2024 15:18:16 -0800 Subject: [PATCH 05/25] updated Tap.catalog_json_text to use serialize_json --- singer_sdk/tap_base.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index caf016cd0..715460916 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -5,7 +5,6 @@ import abc import contextlib -import json import typing as t from enum import Enum @@ -21,7 +20,7 @@ from singer_sdk.helpers import _state from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._state import write_stream_state -from singer_sdk.helpers._util import read_json_file +from singer_sdk.helpers._util import read_json_file, serialize_json from singer_sdk.helpers.capabilities import ( BATCH_CONFIG, CapabilitiesEnum, @@ -313,7 +312,7 @@ def catalog_json_text(self) -> str: Returns: The tap's catalog as formatted JSON text. """ - return json.dumps(self.catalog_dict, indent=2) + return serialize_json(self.catalog_dict, indent=2) @property def _singer_catalog(self) -> Catalog: From 417000c5a9f134f5768a0bda3a422ba4ba40bdeb Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 20 Feb 2024 15:20:03 -0800 Subject: [PATCH 06/25] updated SQLConnector serialize_json and desrialize_json to use _util verson --- singer_sdk/connectors/sql.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index d3cb01b91..6706b62bd 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -2,8 +2,6 @@ from __future__ import annotations -import decimal -import json import logging import typing as t import warnings @@ -11,12 +9,17 @@ from datetime import datetime from functools import lru_cache -import simplejson import sqlalchemy as sa from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema from singer_sdk.exceptions import ConfigValidationError +from singer_sdk.helpers._util import ( + deserialize_json as util_deserialize_json, +) +from singer_sdk.helpers._util import ( + serialize_json as util_serialize_json, +) from singer_sdk.helpers.capabilities import TargetLoadMethods if t.TYPE_CHECKING: @@ -1173,7 +1176,7 @@ def serialize_json(self, obj: object) -> str: .. versionadded:: 0.31.0 """ - return simplejson.dumps(obj, use_decimal=True) + return util_serialize_json(obj) def deserialize_json(self, json_str: str) -> object: """Deserialize a JSON string to an object. @@ -1189,7 +1192,7 @@ def deserialize_json(self, json_str: str) -> object: .. versionadded:: 0.31.0 """ - return json.loads(json_str, parse_float=decimal.Decimal) + return util_deserialize_json(json_str) def delete_old_versions( self, From 20260c645d99f18a32f20d5147b6baf4fac67791 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 20 Feb 2024 15:21:15 -0800 Subject: [PATCH 07/25] updated JSONLinesBatcher.get_batches to use serialize_json --- singer_sdk/contrib/batch_encoder_jsonl.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/singer_sdk/contrib/batch_encoder_jsonl.py b/singer_sdk/contrib/batch_encoder_jsonl.py index 6ce4c8793..262b840df 100644 --- a/singer_sdk/contrib/batch_encoder_jsonl.py +++ b/singer_sdk/contrib/batch_encoder_jsonl.py @@ -3,11 +3,11 @@ from __future__ import annotations import gzip -import json import typing as t from uuid import uuid4 from singer_sdk.batch import BaseBatcher, lazy_chunked_generator +from singer_sdk.helpers._util import serialize_json __all__ = ["JSONLinesBatcher"] @@ -45,8 +45,7 @@ def get_batches( mode="wb", ) as gz: gz.writelines( - (json.dumps(record, default=str) + "\n").encode() - for record in chunk + (serialize_json(record) + "\n").encode() for record in chunk ) file_url = fs.geturl(filename) yield [file_url] From 29b9903414c56d75887c9db5556d1b49f1781b6b Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 20 Feb 2024 15:47:51 -0800 Subject: [PATCH 08/25] added kwargs to serde functions, updated read_json_file to use deserialize_json --- singer_sdk/helpers/_util.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/singer_sdk/helpers/_util.py b/singer_sdk/helpers/_util.py index e4bad57a7..5a86b95f4 100644 --- a/singer_sdk/helpers/_util.py +++ b/singer_sdk/helpers/_util.py @@ -11,7 +11,7 @@ from pathlib import Path, PurePath import pendulum -import simplejson as simjson +import simplejson if sys.version_info < (3, 11): from backports.datetime_fromisoformat import MonkeyPatch @@ -34,11 +34,12 @@ def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj) -def deserialize_json(line: str) -> dict: +def deserialize_json(line: str, **kwargs: t.Any) -> dict: """Deserialize a line of json. Args: line: A single line of json. + **kwargs: Optional key word arguments. Returns: A dictionary of the deserialized json. @@ -50,26 +51,29 @@ def deserialize_json(line: str) -> dict: return json.loads( # type: ignore[no-any-return] line, parse_float=decimal.Decimal, + **kwargs, ) except json.decoder.JSONDecodeError as exc: logger.error("Unable to parse:\n%s", line, exc_info=exc) raise -def serialize_json(line_dict: dict) -> str: +def serialize_json(line_dict: dict, **kwargs: t.Any) -> str: """Serialize a dictionary into a line of json. Args: line_dict: A Python dict. + **kwargs: Optional key word arguments. Returns: A string of serialized json. """ - return simjson.dumps( + return simplejson.dumps( line_dict, use_decimal=True, default=_default_encoding, separators=(",", ":"), + **kwargs, ) @@ -86,7 +90,7 @@ def read_json_file(path: PurePath | str) -> dict[str, t.Any]: msg += f"\nFor more info, please see the sample template at: {template}" raise FileExistsError(msg) - return t.cast(dict, json.loads(Path(path).read_text())) + return t.cast(dict, deserialize_json(Path(path).read_text())) def utc_now() -> pendulum.DateTime: From 9e4b69872ab2ad8b4052697d621e53386aa2baa6 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Tue, 20 Feb 2024 15:54:22 -0800 Subject: [PATCH 09/25] updated Sink.process_batch_files to use deserialize_json --- singer_sdk/sinks/core.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index e35353789..b07b651f5 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -6,7 +6,6 @@ import copy import datetime import importlib.util -import json import time import typing as t from functools import cached_property @@ -38,6 +37,7 @@ get_datelike_property_type, handle_invalid_timestamp_in_record, ) +from singer_sdk.helpers._util import deserialize_json if t.TYPE_CHECKING: from logging import Logger @@ -690,7 +690,9 @@ def process_batch_files( context_file = ( gzip_open(file) if encoding.compression == "gzip" else file ) - context = {"records": [json.loads(line) for line in context_file]} # type: ignore[attr-defined] + context = { + "records": [deserialize_json(line) for line in context_file] + } # type: ignore[attr-defined] self.process_batch(context) elif ( importlib.util.find_spec("pyarrow") From 8dc53eab128b59624d1d337cf068505a3d50eb36 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Wed, 21 Feb 2024 08:46:55 -0800 Subject: [PATCH 10/25] changed attributes to match SQLConnector versions. --- singer_sdk/helpers/_util.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/singer_sdk/helpers/_util.py b/singer_sdk/helpers/_util.py index 5a86b95f4..1fc171537 100644 --- a/singer_sdk/helpers/_util.py +++ b/singer_sdk/helpers/_util.py @@ -34,11 +34,11 @@ def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 return obj.isoformat(sep="T") if isinstance(obj, datetime) else str(obj) -def deserialize_json(line: str, **kwargs: t.Any) -> dict: +def deserialize_json(json_str: str, **kwargs: t.Any) -> dict: """Deserialize a line of json. Args: - line: A single line of json. + json_str: A single line of json. **kwargs: Optional key word arguments. Returns: @@ -49,27 +49,27 @@ def deserialize_json(line: str, **kwargs: t.Any) -> dict: """ try: return json.loads( # type: ignore[no-any-return] - line, + json_str, parse_float=decimal.Decimal, **kwargs, ) except json.decoder.JSONDecodeError as exc: - logger.error("Unable to parse:\n%s", line, exc_info=exc) + logger.error("Unable to parse:\n%s", json_str, exc_info=exc) raise -def serialize_json(line_dict: dict, **kwargs: t.Any) -> str: +def serialize_json(obj: object, **kwargs: t.Any) -> str: """Serialize a dictionary into a line of json. Args: - line_dict: A Python dict. + obj: A Python object usually a dict. **kwargs: Optional key word arguments. Returns: A string of serialized json. """ return simplejson.dumps( - line_dict, + obj, use_decimal=True, default=_default_encoding, separators=(",", ":"), @@ -90,7 +90,7 @@ def read_json_file(path: PurePath | str) -> dict[str, t.Any]: msg += f"\nFor more info, please see the sample template at: {template}" raise FileExistsError(msg) - return t.cast(dict, deserialize_json(Path(path).read_text())) + return deserialize_json(Path(path).read_text()) def utc_now() -> pendulum.DateTime: From efcc71e7c4974fe46f9a9c4b3195b2adc005897e Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Wed, 21 Feb 2024 09:13:51 -0800 Subject: [PATCH 11/25] moved type: ignore to correct line --- singer_sdk/sinks/core.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index b07b651f5..ade5234c2 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -691,8 +691,8 @@ def process_batch_files( gzip_open(file) if encoding.compression == "gzip" else file ) context = { - "records": [deserialize_json(line) for line in context_file] - } # type: ignore[attr-defined] + "records": [deserialize_json(line) for line in context_file] # type: ignore[attr-defined] + } self.process_batch(context) elif ( importlib.util.find_spec("pyarrow") From 04d9370ed7458503d00aeb3d7a3c7a50c82b73f0 Mon Sep 17 00:00:00 2001 From: Dan Norman Date: Mon, 24 Jun 2024 13:50:34 -0700 Subject: [PATCH 12/25] applied a mypy suggestion by removing redundant dictionary cast --- singer_sdk/helpers/_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/helpers/_util.py b/singer_sdk/helpers/_util.py index 6ed16dad3..bf910f4f9 100644 --- a/singer_sdk/helpers/_util.py +++ b/singer_sdk/helpers/_util.py @@ -90,7 +90,7 @@ def read_json_file(path: PurePath | str) -> dict[str, t.Any]: msg += f"\nFor more info, please see the sample template at: {template}" raise FileExistsError(msg) - return t.cast(dict, deserialize_json(Path(path).read_text(encoding="utf-8"))) + return deserialize_json(Path(path).read_text(encoding="utf-8")) def utc_now() -> datetime.datetime: From 5cfa21edf81b9d9e1e44ec022632fb7f5c44445e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 15:41:49 -0600 Subject: [PATCH 13/25] Move base SerDe to _singerlib --- singer_sdk/_singerlib/messages.py | 2 +- singer_sdk/_singerlib/serde.py | 66 +++++++++++++++++++++ singer_sdk/connectors/sql.py | 12 +--- singer_sdk/contrib/batch_encoder_jsonl.py | 2 +- singer_sdk/helpers/_flattening.py | 2 +- singer_sdk/helpers/_util.py | 70 +---------------------- singer_sdk/io_base.py | 2 +- singer_sdk/sinks/core.py | 2 +- singer_sdk/tap_base.py | 3 +- tests/core/test_io.py | 2 +- 10 files changed, 78 insertions(+), 85 deletions(-) create mode 100644 singer_sdk/_singerlib/serde.py diff --git a/singer_sdk/_singerlib/messages.py b/singer_sdk/_singerlib/messages.py index 4f45acb69..2131290b6 100644 --- a/singer_sdk/_singerlib/messages.py +++ b/singer_sdk/_singerlib/messages.py @@ -8,7 +8,7 @@ from dataclasses import asdict, dataclass, field from datetime import datetime, timezone -from singer_sdk.helpers._util import serialize_json +from singer_sdk._singerlib.serde import serialize_json if sys.version_info < (3, 11): from backports.datetime_fromisoformat import MonkeyPatch diff --git a/singer_sdk/_singerlib/serde.py b/singer_sdk/_singerlib/serde.py new file mode 100644 index 000000000..cd0cfce2e --- /dev/null +++ b/singer_sdk/_singerlib/serde.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +import datetime +import decimal +import json +import logging +import typing as t + +import simplejson + +logger = logging.getLogger(__name__) + + +def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 + """Default JSON encoder. + + Args: + obj: The object to encode. + + Returns: + The encoded object. + """ + return obj.isoformat(sep="T") if isinstance(obj, datetime.datetime) else str(obj) + + +def deserialize_json(json_str: str, **kwargs: t.Any) -> dict: + """Deserialize a line of json. + + Args: + json_str: A single line of json. + **kwargs: Optional key word arguments. + + Returns: + A dictionary of the deserialized json. + + Raises: + json.decoder.JSONDecodeError: raised if any lines are not valid json + """ + try: + return json.loads( # type: ignore[no-any-return] + json_str, + parse_float=decimal.Decimal, + **kwargs, + ) + except json.decoder.JSONDecodeError as exc: + logger.exception("Unable to parse:\n%s", json_str, exc_info=exc) + raise + + +def serialize_json(obj: object, **kwargs: t.Any) -> str: + """Serialize a dictionary into a line of json. + + Args: + obj: A Python object usually a dict. + **kwargs: Optional key word arguments. + + Returns: + A string of serialized json. + """ + return simplejson.dumps( + obj, + use_decimal=True, + default=_default_encoding, + separators=(",", ":"), + **kwargs, + ) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 652246e06..0a9a2f90a 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -12,14 +12,8 @@ import sqlalchemy as sa from singer_sdk import typing as th -from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema +from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema, serde from singer_sdk.exceptions import ConfigValidationError -from singer_sdk.helpers._util import ( - deserialize_json as util_deserialize_json, -) -from singer_sdk.helpers._util import ( - serialize_json as util_serialize_json, -) from singer_sdk.helpers.capabilities import TargetLoadMethods if t.TYPE_CHECKING: @@ -1170,7 +1164,7 @@ def serialize_json(self, obj: object) -> str: # noqa: PLR6301 .. versionadded:: 0.31.0 """ - return util_serialize_json(obj) + return serde.serialize_json(obj) def deserialize_json(self, json_str: str) -> object: # noqa: PLR6301 """Deserialize a JSON string to an object. @@ -1186,7 +1180,7 @@ def deserialize_json(self, json_str: str) -> object: # noqa: PLR6301 .. versionadded:: 0.31.0 """ - return util_deserialize_json(json_str) + return serde.deserialize_json(json_str) def delete_old_versions( self, diff --git a/singer_sdk/contrib/batch_encoder_jsonl.py b/singer_sdk/contrib/batch_encoder_jsonl.py index 262b840df..7c9505de4 100644 --- a/singer_sdk/contrib/batch_encoder_jsonl.py +++ b/singer_sdk/contrib/batch_encoder_jsonl.py @@ -6,8 +6,8 @@ import typing as t from uuid import uuid4 +from singer_sdk._singerlib.serde import serialize_json from singer_sdk.batch import BaseBatcher, lazy_chunked_generator -from singer_sdk.helpers._util import serialize_json __all__ = ["JSONLinesBatcher"] diff --git a/singer_sdk/helpers/_flattening.py b/singer_sdk/helpers/_flattening.py index edd262f98..c1999fcc6 100644 --- a/singer_sdk/helpers/_flattening.py +++ b/singer_sdk/helpers/_flattening.py @@ -10,7 +10,7 @@ import inflection -from singer_sdk.helpers._util import serialize_json +from singer_sdk._singerlib.serde import serialize_json DEFAULT_FLATTENING_SEPARATOR = "__" diff --git a/singer_sdk/helpers/_util.py b/singer_sdk/helpers/_util.py index 5f301fa16..0e8250c2a 100644 --- a/singer_sdk/helpers/_util.py +++ b/singer_sdk/helpers/_util.py @@ -3,78 +3,10 @@ from __future__ import annotations import datetime -import decimal import json -import logging -import sys import typing as t from pathlib import Path, PurePath -import simplejson - -if sys.version_info < (3, 11): - from backports.datetime_fromisoformat import MonkeyPatch - - MonkeyPatch.patch_fromisoformat() - - -logger = logging.getLogger(__name__) - - -def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 - """Default JSON encoder. - - Args: - obj: The object to encode. - - Returns: - The encoded object. - """ - return obj.isoformat(sep="T") if isinstance(obj, datetime.datetime) else str(obj) - - -def deserialize_json(json_str: str, **kwargs: t.Any) -> dict: - """Deserialize a line of json. - - Args: - json_str: A single line of json. - **kwargs: Optional key word arguments. - - Returns: - A dictionary of the deserialized json. - - Raises: - json.decoder.JSONDecodeError: raised if any lines are not valid json - """ - try: - return json.loads( # type: ignore[no-any-return] - json_str, - parse_float=decimal.Decimal, - **kwargs, - ) - except json.decoder.JSONDecodeError as exc: - logger.exception("Unable to parse:\n%s", json_str, exc_info=exc) - raise - - -def serialize_json(obj: object, **kwargs: t.Any) -> str: - """Serialize a dictionary into a line of json. - - Args: - obj: A Python object usually a dict. - **kwargs: Optional key word arguments. - - Returns: - A string of serialized json. - """ - return simplejson.dumps( - obj, - use_decimal=True, - default=_default_encoding, - separators=(",", ":"), - **kwargs, - ) - def read_json_file(path: PurePath | str) -> dict[str, t.Any]: """Read json file, throwing an error if missing.""" @@ -89,7 +21,7 @@ def read_json_file(path: PurePath | str) -> dict[str, t.Any]: msg += f"\nFor more info, please see the sample template at: {template}" raise FileExistsError(msg) - return deserialize_json(Path(path).read_text(encoding="utf-8")) + return t.cast(dict, json.loads(Path(path).read_text(encoding="utf-8"))) def utc_now() -> datetime.datetime: diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index 33e045f61..f0abc568b 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -11,8 +11,8 @@ from singer_sdk._singerlib.messages import Message, SingerMessageType from singer_sdk._singerlib.messages import format_message as singer_format_message from singer_sdk._singerlib.messages import write_message as singer_write_message +from singer_sdk._singerlib.serde import deserialize_json from singer_sdk.exceptions import InvalidInputLine -from singer_sdk.helpers._util import deserialize_json logger = logging.getLogger(__name__) diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 44dacc5cd..4fb475f8b 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -16,6 +16,7 @@ import jsonschema from typing_extensions import override +from singer_sdk._singerlib.serde import deserialize_json from singer_sdk.exceptions import ( InvalidJSONSchema, InvalidRecord, @@ -37,7 +38,6 @@ get_datelike_property_type, handle_invalid_timestamp_in_record, ) -from singer_sdk.helpers._util import deserialize_json if t.TYPE_CHECKING: from logging import Logger diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 256ad90e3..89076b6f1 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -10,6 +10,7 @@ import click from singer_sdk._singerlib import Catalog, StateMessage +from singer_sdk._singerlib.serde import serialize_json from singer_sdk.configuration._dict_config import merge_missing_config_jsonschema from singer_sdk.exceptions import ( AbortedSyncFailedException, @@ -19,7 +20,7 @@ from singer_sdk.helpers import _state from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._state import write_stream_state -from singer_sdk.helpers._util import read_json_file, serialize_json +from singer_sdk.helpers._util import read_json_file from singer_sdk.helpers.capabilities import ( BATCH_CONFIG, CapabilitiesEnum, diff --git a/tests/core/test_io.py b/tests/core/test_io.py index ecc52fac1..9ec9532f2 100644 --- a/tests/core/test_io.py +++ b/tests/core/test_io.py @@ -10,7 +10,7 @@ import pytest from singer_sdk._singerlib import RecordMessage -from singer_sdk.helpers._util import deserialize_json +from singer_sdk._singerlib.serde import deserialize_json from singer_sdk.io_base import SingerReader, SingerWriter From b9faa7fcaf88b5278d8be178f575f0bb37ed2af0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 16:39:46 -0600 Subject: [PATCH 14/25] Re-add `SingerReader.deserialize_json` --- singer_sdk/io_base.py | 13 ++++++++++++- tests/core/test_io.py | 8 +++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index f0abc568b..1834b6fad 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -51,6 +51,17 @@ def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" raise InvalidInputLine(msg) + def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 + """Deserialize a line of json. + + Args: + line: A single line of json. + + Returns: + A dictionary of the deserialized json. + """ + return deserialize_json(line) + def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: """Internal method to process jsonl lines from a Singer tap. @@ -62,7 +73,7 @@ def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: """ stats: dict[str, int] = defaultdict(int) for line in file_input: - line_dict = deserialize_json(line) + line_dict = self.deserialize_json(line) self._assert_line_requires(line_dict, requires={"type"}) record_type: SingerMessageType = line_dict["type"] diff --git a/tests/core/test_io.py b/tests/core/test_io.py index 9ec9532f2..0fcce614b 100644 --- a/tests/core/test_io.py +++ b/tests/core/test_io.py @@ -10,7 +10,6 @@ import pytest from singer_sdk._singerlib import RecordMessage -from singer_sdk._singerlib.serde import deserialize_json from singer_sdk.io_base import SingerReader, SingerWriter @@ -53,8 +52,9 @@ def _process_state_message(self, message_dict: dict) -> None: ], ) def test_deserialize(line, expected, exception): + reader = DummyReader() with exception: - assert deserialize_json(line) == expected + assert reader.deserialize_json(line) == expected # Benchmark Tests @@ -104,8 +104,10 @@ def test_bench_deserialize_json(benchmark, bench_encoded_record): """Run benchmark for Sink._validator method validate.""" number_of_runs = 1000 + reader = DummyReader() + def run_deserialize_json(): for record in itertools.repeat(bench_encoded_record, number_of_runs): - deserialize_json(record) + reader.deserialize_json(record) benchmark(run_deserialize_json) From dc5c7a1764f4703ce6ae9ba08cf646e6142a74b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 17:14:09 -0600 Subject: [PATCH 15/25] Implement naive central JSON loading and dumping functions --- singer_sdk/helpers/_util.py | 47 +++++++++++++++++++++++++++++++++++-- singer_sdk/tap_base.py | 5 ++-- 2 files changed, 47 insertions(+), 5 deletions(-) diff --git a/singer_sdk/helpers/_util.py b/singer_sdk/helpers/_util.py index 0e8250c2a..d4572a5f5 100644 --- a/singer_sdk/helpers/_util.py +++ b/singer_sdk/helpers/_util.py @@ -3,10 +3,53 @@ from __future__ import annotations import datetime -import json +import decimal import typing as t from pathlib import Path, PurePath +import simplejson + + +def dump_json(obj: t.Any, **kwargs: t.Any) -> str: # noqa: ANN401 + """Dump json data to a file. + + Args: + obj: A Python object, usually a dict. + **kwargs: Optional key word arguments. + + Returns: + A string of serialized json. + + .. warning:: Do not use this function to serialize Singer messages or bulk data. + Use the functions in ``singer_sdk._singerlib.serde`` instead. + """ + return simplejson.dumps( + obj, + use_decimal=True, + separators=(",", ":"), + **kwargs, + ) + + +def load_json(json_str: str, **kwargs: t.Any) -> dict: + """Load json data from a file. + + Args: + json_str: A valid JSON string. + **kwargs: Optional key word arguments. + + Returns: + A Python object, usually a dict. + + .. warning:: Do not use this function to parse Singer messages or bulk data. + Use the functions in ``singer_sdk._singerlib.serde`` instead. + """ + return simplejson.loads( # type: ignore[no-any-return] + json_str, + parse_float=decimal.Decimal, + **kwargs, + ) + def read_json_file(path: PurePath | str) -> dict[str, t.Any]: """Read json file, throwing an error if missing.""" @@ -21,7 +64,7 @@ def read_json_file(path: PurePath | str) -> dict[str, t.Any]: msg += f"\nFor more info, please see the sample template at: {template}" raise FileExistsError(msg) - return t.cast(dict, json.loads(Path(path).read_text(encoding="utf-8"))) + return load_json(Path(path).read_text(encoding="utf-8")) def utc_now() -> datetime.datetime: diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index 89076b6f1..d69fa5f38 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -10,7 +10,6 @@ import click from singer_sdk._singerlib import Catalog, StateMessage -from singer_sdk._singerlib.serde import serialize_json from singer_sdk.configuration._dict_config import merge_missing_config_jsonschema from singer_sdk.exceptions import ( AbortedSyncFailedException, @@ -20,7 +19,7 @@ from singer_sdk.helpers import _state from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._state import write_stream_state -from singer_sdk.helpers._util import read_json_file +from singer_sdk.helpers._util import dump_json, read_json_file from singer_sdk.helpers.capabilities import ( BATCH_CONFIG, CapabilitiesEnum, @@ -312,7 +311,7 @@ def catalog_json_text(self) -> str: Returns: The tap's catalog as formatted JSON text. """ - return serialize_json(self.catalog_dict, indent=2) + return dump_json(self.catalog_dict, indent=2) @property def _singer_catalog(self) -> Catalog: From 3f29a1f42e4370b3320c7a0358e06beace12037b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 17:22:59 -0600 Subject: [PATCH 16/25] Use util methods to SerDe to and from SQL --- singer_sdk/connectors/sql.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 0a9a2f90a..4d2dd4842 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -12,8 +12,9 @@ import sqlalchemy as sa from singer_sdk import typing as th -from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema, serde +from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema from singer_sdk.exceptions import ConfigValidationError +from singer_sdk.helpers._util import dump_json, load_json from singer_sdk.helpers.capabilities import TargetLoadMethods if t.TYPE_CHECKING: @@ -1164,7 +1165,7 @@ def serialize_json(self, obj: object) -> str: # noqa: PLR6301 .. versionadded:: 0.31.0 """ - return serde.serialize_json(obj) + return dump_json(obj) def deserialize_json(self, json_str: str) -> object: # noqa: PLR6301 """Deserialize a JSON string to an object. @@ -1180,7 +1181,7 @@ def deserialize_json(self, json_str: str) -> object: # noqa: PLR6301 .. versionadded:: 0.31.0 """ - return serde.deserialize_json(json_str) + return load_json(json_str) def delete_old_versions( self, From 96525f1ce27273237416140e1a4114293ab26c57 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 18:19:25 -0600 Subject: [PATCH 17/25] Make the Singer writer and reader classes generic --- singer_sdk/_singerlib/serde.py | 2 +- singer_sdk/io_base.py | 124 +++++++++++++++++++++------------ 2 files changed, 79 insertions(+), 47 deletions(-) diff --git a/singer_sdk/_singerlib/serde.py b/singer_sdk/_singerlib/serde.py index cd0cfce2e..9971bc301 100644 --- a/singer_sdk/_singerlib/serde.py +++ b/singer_sdk/_singerlib/serde.py @@ -23,7 +23,7 @@ def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 return obj.isoformat(sep="T") if isinstance(obj, datetime.datetime) else str(obj) -def deserialize_json(json_str: str, **kwargs: t.Any) -> dict: +def deserialize_json(json_str: str | bytes, **kwargs: t.Any) -> dict: """Deserialize a line of json. Args: diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index 1834b6fad..ce7d09ee1 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -9,60 +9,30 @@ from collections import Counter, defaultdict from singer_sdk._singerlib.messages import Message, SingerMessageType -from singer_sdk._singerlib.messages import format_message as singer_format_message -from singer_sdk._singerlib.messages import write_message as singer_write_message -from singer_sdk._singerlib.serde import deserialize_json +from singer_sdk._singerlib.serde import deserialize_json, serialize_json from singer_sdk.exceptions import InvalidInputLine logger = logging.getLogger(__name__) +# TODO: Use to default to 'str' here +# https://peps.python.org/pep-0696/ +T = t.TypeVar("T", str, bytes) -class SingerReader(metaclass=abc.ABCMeta): - """Interface for all plugins reading Singer messages from stdin.""" + +class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins reading Singer messages as strings or bytes.""" @t.final - def listen(self, file_input: t.IO[str] | None = None) -> None: + def listen(self, file_input: t.IO[T] | None = None) -> None: """Read from input until all messages are processed. Args: file_input: Readable stream of messages. Defaults to standard in. - - This method is internal to the SDK and should not need to be overridden. """ - if not file_input: - file_input = sys.stdin - - self._process_lines(file_input) + self._process_lines(file_input or self.default_input) self._process_endofpipe() - @staticmethod - def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: - """Check if dictionary . - - Args: - line_dict: TODO - requires: TODO - - Raises: - InvalidInputLine: raised if any required keys are missing - """ - if not requires.issubset(line_dict): - missing = requires - set(line_dict) - msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" - raise InvalidInputLine(msg) - - def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 - """Deserialize a line of json. - - Args: - line: A single line of json. - - Returns: - A dictionary of the deserialized json. - """ - return deserialize_json(line) - - def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: + def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: """Internal method to process jsonl lines from a Singer tap. Args: @@ -99,6 +69,29 @@ def _process_lines(self, file_input: t.IO[str]) -> t.Counter[str]: return Counter(**stats) + @property + @abc.abstractmethod + def default_input(self) -> t.IO[T]: ... # noqa: D102 + + @staticmethod + def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: + """Check if dictionary . + + Args: + line_dict: TODO + requires: TODO + + Raises: + InvalidInputLine: raised if any required keys are missing + """ + if not requires.issubset(line_dict): + missing = requires - set(line_dict) + msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" + raise InvalidInputLine(msg) + + @abc.abstractmethod + def deserialize_json(self, line: T) -> dict: ... # noqa: D102 + @abc.abstractmethod def _process_schema_message(self, message_dict: dict) -> None: ... @@ -131,10 +124,27 @@ def _process_endofpipe(self) -> None: # noqa: PLR6301 logger.debug("End of pipe reached") -class SingerWriter: - """Interface for all plugins writting Singer messages to stdout.""" +class SingerReader(GenericSingerReader[str]): + """Base class for all plugins reading Singer messages as strings from stdin.""" + + default_input = sys.stdin + + def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 + """Deserialize a line of json. + + Args: + line: A single line of json. + + Returns: + A dictionary of the deserialized json. + """ + return deserialize_json(line) + + +class GenericSingerWriter(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins writing Singer messages as strings or bytes.""" - def format_message(self, message: Message) -> str: # noqa: PLR6301 + def format_message(self, message: Message) -> T: """Format a message as a JSON string. Args: @@ -143,12 +153,34 @@ def format_message(self, message: Message) -> str: # noqa: PLR6301 Returns: The formatted message. """ - return singer_format_message(message) + return self.serialize_json(message.to_dict()) + + @abc.abstractmethod + def serialize_json(self, obj: object) -> T: ... # noqa: D102 + + @abc.abstractmethod + def write_message(self, message: Message) -> None: ... # noqa: D102 + + +class SingerWriter(GenericSingerWriter[str]): + """Interface for all plugins writing Singer messages to stdout.""" + + def serialize_json(self, obj: object) -> str: # noqa: PLR6301 + """Serialize a dictionary into a line of json. + + Args: + obj: A Python object usually a dict. + + Returns: + A string of serialized json. + """ + return serialize_json(obj) - def write_message(self, message: Message) -> None: # noqa: PLR6301 + def write_message(self, message: Message) -> None: """Write a message to stdout. Args: message: The message to write. """ - singer_write_message(message) + sys.stdout.write(self.format_message(message) + "\n") + sys.stdout.flush() From af9b5d56b5edeef2962040e20134e6f1a95c8715 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 19:00:50 -0600 Subject: [PATCH 18/25] Move Singer IO to _singerlib --- .flake8 | 2 +- singer_sdk/_singerlib/__init__.py | 4 + singer_sdk/_singerlib/encoding/__init__.py | 12 ++ singer_sdk/_singerlib/encoding/base.py | 144 +++++++++++++++ singer_sdk/_singerlib/encoding/simple.py | 52 ++++++ singer_sdk/_singerlib/exceptions.py | 9 + singer_sdk/_singerlib/{serde.py => json.py} | 5 + singer_sdk/_singerlib/messages.py | 2 +- singer_sdk/contrib/batch_encoder_jsonl.py | 2 +- singer_sdk/exceptions.py | 8 +- singer_sdk/helpers/_flattening.py | 2 +- singer_sdk/io_base.py | 183 +------------------- singer_sdk/sinks/core.py | 2 +- tests/_singerlib/test_messages.py | 7 +- tests/core/test_io.py | 17 +- 15 files changed, 253 insertions(+), 198 deletions(-) create mode 100644 singer_sdk/_singerlib/encoding/__init__.py create mode 100644 singer_sdk/_singerlib/encoding/base.py create mode 100644 singer_sdk/_singerlib/encoding/simple.py create mode 100644 singer_sdk/_singerlib/exceptions.py rename singer_sdk/_singerlib/{serde.py => json.py} (96%) diff --git a/.flake8 b/.flake8 index 9311ae8d0..07b7824bd 100644 --- a/.flake8 +++ b/.flake8 @@ -1,7 +1,7 @@ [flake8] max-line-length = 88 exclude = cookiecutter -ignore = E, W +ignore = E, F, W per-file-ignores = # Don't require docstrings conventions in private modules singer_sdk/helpers/_*.py:DAR diff --git a/singer_sdk/_singerlib/__init__.py b/singer_sdk/_singerlib/__init__.py index bc0b4523b..8386399d6 100644 --- a/singer_sdk/_singerlib/__init__.py +++ b/singer_sdk/_singerlib/__init__.py @@ -1,5 +1,6 @@ from __future__ import annotations +from singer_sdk._singerlib import exceptions from singer_sdk._singerlib.catalog import ( Catalog, CatalogEntry, @@ -16,6 +17,7 @@ SingerMessageType, StateMessage, exclude_null_dict, + format_message, write_message, ) from singer_sdk._singerlib.schema import Schema, resolve_schema_references @@ -35,7 +37,9 @@ "SingerMessageType", "StateMessage", "StreamMetadata", + "exceptions", "exclude_null_dict", + "format_message", "resolve_schema_references", "strftime", "strptime_to_utc", diff --git a/singer_sdk/_singerlib/encoding/__init__.py b/singer_sdk/_singerlib/encoding/__init__.py new file mode 100644 index 000000000..7819a1452 --- /dev/null +++ b/singer_sdk/_singerlib/encoding/__init__.py @@ -0,0 +1,12 @@ +from __future__ import annotations + +from .base import GenericSingerReader, GenericSingerWriter, SingerMessageType +from .simple import SingerReader, SingerWriter + +__all__ = [ + "GenericSingerReader", + "GenericSingerWriter", + "SingerMessageType", + "SingerReader", + "SingerWriter", +] diff --git a/singer_sdk/_singerlib/encoding/base.py b/singer_sdk/_singerlib/encoding/base.py new file mode 100644 index 000000000..91d8a1c08 --- /dev/null +++ b/singer_sdk/_singerlib/encoding/base.py @@ -0,0 +1,144 @@ +"""Abstract base classes for all Singer messages IO operations.""" + +from __future__ import annotations + +import abc +import logging +import typing as t +from collections import Counter, defaultdict + +from singer_sdk._singerlib import exceptions +from singer_sdk._singerlib.messages import Message, SingerMessageType + +logger = logging.getLogger(__name__) + + +# TODO: Use to default to 'str' here +# https://peps.python.org/pep-0696/ +T = t.TypeVar("T", str, bytes) + + +class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins reading Singer messages as strings or bytes.""" + + @t.final + def listen(self, file_input: t.IO[T] | None = None) -> None: + """Read from input until all messages are processed. + + Args: + file_input: Readable stream of messages. Defaults to standard in. + """ + self._process_lines(file_input or self.default_input) + self._process_endofpipe() + + def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: + """Internal method to process jsonl lines from a Singer tap. + + Args: + file_input: Readable stream of messages, each on a separate line. + + Returns: + A counter object for the processed lines. + """ + stats: dict[str, int] = defaultdict(int) + for line in file_input: + line_dict = self.deserialize_json(line) + self._assert_line_requires(line_dict, requires={"type"}) + + record_type: SingerMessageType = line_dict["type"] + if record_type == SingerMessageType.SCHEMA: + self._process_schema_message(line_dict) + + elif record_type == SingerMessageType.RECORD: + self._process_record_message(line_dict) + + elif record_type == SingerMessageType.ACTIVATE_VERSION: + self._process_activate_version_message(line_dict) + + elif record_type == SingerMessageType.STATE: + self._process_state_message(line_dict) + + elif record_type == SingerMessageType.BATCH: + self._process_batch_message(line_dict) + + else: + self._process_unknown_message(line_dict) + + stats[record_type] += 1 + + return Counter(**stats) + + @property + @abc.abstractmethod + def default_input(self) -> t.IO[T]: ... + + @staticmethod + def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: + """Check if dictionary . + + Args: + line_dict: TODO + requires: TODO + + Raises: + InvalidInputLine: raised if any required keys are missing + """ + if not requires.issubset(line_dict): + missing = requires - set(line_dict) + msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" + raise exceptions.InvalidInputLine(msg) + + @abc.abstractmethod + def deserialize_json(self, line: T) -> dict: ... + + @abc.abstractmethod + def _process_schema_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_record_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_state_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_activate_version_message(self, message_dict: dict) -> None: ... + + @abc.abstractmethod + def _process_batch_message(self, message_dict: dict) -> None: ... + + def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 + """Internal method to process unknown message types from a Singer tap. + + Args: + message_dict: Dictionary representation of the Singer message. + + Raises: + ValueError: raised if a message type is not recognized + """ + record_type = message_dict["type"] + msg = f"Unknown message type '{record_type}' in message." + raise ValueError(msg) + + def _process_endofpipe(self) -> None: # noqa: PLR6301 + logger.debug("End of pipe reached") + + +class GenericSingerWriter(t.Generic[T], metaclass=abc.ABCMeta): + """Interface for all plugins writing Singer messages as strings or bytes.""" + + def format_message(self, message: Message) -> T: + """Format a message as a JSON string. + + Args: + message: The message to format. + + Returns: + The formatted message. + """ + return self.serialize_json(message.to_dict()) + + @abc.abstractmethod + def serialize_json(self, obj: object) -> T: ... + + @abc.abstractmethod + def write_message(self, message: Message) -> None: ... diff --git a/singer_sdk/_singerlib/encoding/simple.py b/singer_sdk/_singerlib/encoding/simple.py new file mode 100644 index 000000000..51d92db47 --- /dev/null +++ b/singer_sdk/_singerlib/encoding/simple.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import sys +import typing as t + +from singer_sdk._singerlib.json import deserialize_json, serialize_json + +from .base import GenericSingerReader, GenericSingerWriter + +if t.TYPE_CHECKING: + from singer_sdk._singerlib.messages import Message + + +class SingerReader(GenericSingerReader[str]): + """Base class for all plugins reading Singer messages as strings from stdin.""" + + default_input = sys.stdin + + def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 + """Deserialize a line of json. + + Args: + line: A single line of json. + + Returns: + A dictionary of the deserialized json. + """ + return deserialize_json(line) + + +class SingerWriter(GenericSingerWriter[str]): + """Interface for all plugins writing Singer messages to stdout.""" + + def serialize_json(self, obj: object) -> str: # noqa: PLR6301 + """Serialize a dictionary into a line of json. + + Args: + obj: A Python object usually a dict. + + Returns: + A string of serialized json. + """ + return serialize_json(obj) + + def write_message(self, message: Message) -> None: + """Write a message to stdout. + + Args: + message: The message to write. + """ + sys.stdout.write(self.format_message(message) + "\n") + sys.stdout.flush() diff --git a/singer_sdk/_singerlib/exceptions.py b/singer_sdk/_singerlib/exceptions.py new file mode 100644 index 000000000..e3726bb6a --- /dev/null +++ b/singer_sdk/_singerlib/exceptions.py @@ -0,0 +1,9 @@ +from __future__ import annotations + +__all__ = [ + "InvalidInputLine", +] + + +class InvalidInputLine(Exception): + """Raised when an input line is not a valid Singer message.""" diff --git a/singer_sdk/_singerlib/serde.py b/singer_sdk/_singerlib/json.py similarity index 96% rename from singer_sdk/_singerlib/serde.py rename to singer_sdk/_singerlib/json.py index 9971bc301..367238a4a 100644 --- a/singer_sdk/_singerlib/serde.py +++ b/singer_sdk/_singerlib/json.py @@ -10,6 +10,11 @@ logger = logging.getLogger(__name__) +__all__ = [ + "deserialize_json", + "serialize_json", +] + def _default_encoding(obj: t.Any) -> str: # noqa: ANN401 """Default JSON encoder. diff --git a/singer_sdk/_singerlib/messages.py b/singer_sdk/_singerlib/messages.py index 2131290b6..ae8977c9e 100644 --- a/singer_sdk/_singerlib/messages.py +++ b/singer_sdk/_singerlib/messages.py @@ -8,7 +8,7 @@ from dataclasses import asdict, dataclass, field from datetime import datetime, timezone -from singer_sdk._singerlib.serde import serialize_json +from singer_sdk._singerlib.json import serialize_json if sys.version_info < (3, 11): from backports.datetime_fromisoformat import MonkeyPatch diff --git a/singer_sdk/contrib/batch_encoder_jsonl.py b/singer_sdk/contrib/batch_encoder_jsonl.py index 7c9505de4..6f121f8d4 100644 --- a/singer_sdk/contrib/batch_encoder_jsonl.py +++ b/singer_sdk/contrib/batch_encoder_jsonl.py @@ -6,7 +6,7 @@ import typing as t from uuid import uuid4 -from singer_sdk._singerlib.serde import serialize_json +from singer_sdk._singerlib.json import serialize_json from singer_sdk.batch import BaseBatcher, lazy_chunked_generator __all__ = ["JSONLinesBatcher"] diff --git a/singer_sdk/exceptions.py b/singer_sdk/exceptions.py index 20ec7ae65..a766952f9 100644 --- a/singer_sdk/exceptions.py +++ b/singer_sdk/exceptions.py @@ -5,6 +5,8 @@ import abc import typing as t +from singer_sdk._singerlib.exceptions import InvalidInputLine # noqa: F401 + if t.TYPE_CHECKING: import requests @@ -137,11 +139,7 @@ class ConformedNameClashException(Exception): class MissingKeyPropertiesError(Exception): - """Raised when a recieved (and/or transformed) record is missing key properties.""" - - -class InvalidInputLine(Exception): - """Raised when an input line is not a valid Singer message.""" + """Raised when a received (and/or transformed) record is missing key properties.""" class InvalidJSONSchema(Exception): diff --git a/singer_sdk/helpers/_flattening.py b/singer_sdk/helpers/_flattening.py index c1999fcc6..79ca50fdc 100644 --- a/singer_sdk/helpers/_flattening.py +++ b/singer_sdk/helpers/_flattening.py @@ -10,7 +10,7 @@ import inflection -from singer_sdk._singerlib.serde import serialize_json +from singer_sdk._singerlib.json import serialize_json DEFAULT_FLATTENING_SEPARATOR = "__" diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index ce7d09ee1..f9041beea 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -2,185 +2,4 @@ from __future__ import annotations -import abc -import logging -import sys -import typing as t -from collections import Counter, defaultdict - -from singer_sdk._singerlib.messages import Message, SingerMessageType -from singer_sdk._singerlib.serde import deserialize_json, serialize_json -from singer_sdk.exceptions import InvalidInputLine - -logger = logging.getLogger(__name__) - -# TODO: Use to default to 'str' here -# https://peps.python.org/pep-0696/ -T = t.TypeVar("T", str, bytes) - - -class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): - """Interface for all plugins reading Singer messages as strings or bytes.""" - - @t.final - def listen(self, file_input: t.IO[T] | None = None) -> None: - """Read from input until all messages are processed. - - Args: - file_input: Readable stream of messages. Defaults to standard in. - """ - self._process_lines(file_input or self.default_input) - self._process_endofpipe() - - def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: - """Internal method to process jsonl lines from a Singer tap. - - Args: - file_input: Readable stream of messages, each on a separate line. - - Returns: - A counter object for the processed lines. - """ - stats: dict[str, int] = defaultdict(int) - for line in file_input: - line_dict = self.deserialize_json(line) - self._assert_line_requires(line_dict, requires={"type"}) - - record_type: SingerMessageType = line_dict["type"] - if record_type == SingerMessageType.SCHEMA: - self._process_schema_message(line_dict) - - elif record_type == SingerMessageType.RECORD: - self._process_record_message(line_dict) - - elif record_type == SingerMessageType.ACTIVATE_VERSION: - self._process_activate_version_message(line_dict) - - elif record_type == SingerMessageType.STATE: - self._process_state_message(line_dict) - - elif record_type == SingerMessageType.BATCH: - self._process_batch_message(line_dict) - - else: - self._process_unknown_message(line_dict) - - stats[record_type] += 1 - - return Counter(**stats) - - @property - @abc.abstractmethod - def default_input(self) -> t.IO[T]: ... # noqa: D102 - - @staticmethod - def _assert_line_requires(line_dict: dict, requires: set[str]) -> None: - """Check if dictionary . - - Args: - line_dict: TODO - requires: TODO - - Raises: - InvalidInputLine: raised if any required keys are missing - """ - if not requires.issubset(line_dict): - missing = requires - set(line_dict) - msg = f"Line is missing required {', '.join(missing)} key(s): {line_dict}" - raise InvalidInputLine(msg) - - @abc.abstractmethod - def deserialize_json(self, line: T) -> dict: ... # noqa: D102 - - @abc.abstractmethod - def _process_schema_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_record_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_state_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_activate_version_message(self, message_dict: dict) -> None: ... - - @abc.abstractmethod - def _process_batch_message(self, message_dict: dict) -> None: ... - - def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 - """Internal method to process unknown message types from a Singer tap. - - Args: - message_dict: Dictionary representation of the Singer message. - - Raises: - ValueError: raised if a message type is not recognized - """ - record_type = message_dict["type"] - msg = f"Unknown message type '{record_type}' in message." - raise ValueError(msg) - - def _process_endofpipe(self) -> None: # noqa: PLR6301 - logger.debug("End of pipe reached") - - -class SingerReader(GenericSingerReader[str]): - """Base class for all plugins reading Singer messages as strings from stdin.""" - - default_input = sys.stdin - - def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 - """Deserialize a line of json. - - Args: - line: A single line of json. - - Returns: - A dictionary of the deserialized json. - """ - return deserialize_json(line) - - -class GenericSingerWriter(t.Generic[T], metaclass=abc.ABCMeta): - """Interface for all plugins writing Singer messages as strings or bytes.""" - - def format_message(self, message: Message) -> T: - """Format a message as a JSON string. - - Args: - message: The message to format. - - Returns: - The formatted message. - """ - return self.serialize_json(message.to_dict()) - - @abc.abstractmethod - def serialize_json(self, obj: object) -> T: ... # noqa: D102 - - @abc.abstractmethod - def write_message(self, message: Message) -> None: ... # noqa: D102 - - -class SingerWriter(GenericSingerWriter[str]): - """Interface for all plugins writing Singer messages to stdout.""" - - def serialize_json(self, obj: object) -> str: # noqa: PLR6301 - """Serialize a dictionary into a line of json. - - Args: - obj: A Python object usually a dict. - - Returns: - A string of serialized json. - """ - return serialize_json(obj) - - def write_message(self, message: Message) -> None: - """Write a message to stdout. - - Args: - message: The message to write. - """ - sys.stdout.write(self.format_message(message) + "\n") - sys.stdout.flush() +from singer_sdk._singerlib.encoding import * # noqa: F403 diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index 4fb475f8b..53533d58b 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -16,7 +16,7 @@ import jsonschema from typing_extensions import override -from singer_sdk._singerlib.serde import deserialize_json +from singer_sdk._singerlib.json import deserialize_json from singer_sdk.exceptions import ( InvalidJSONSchema, InvalidRecord, diff --git a/tests/_singerlib/test_messages.py b/tests/_singerlib/test_messages.py index 491573545..b433e9610 100644 --- a/tests/_singerlib/test_messages.py +++ b/tests/_singerlib/test_messages.py @@ -8,7 +8,6 @@ from pytz import timezone import singer_sdk._singerlib as singer -from singer_sdk.io_base import SingerWriter UTC = datetime.timezone.utc @@ -19,24 +18,22 @@ def test_exclude_null_dict(): def test_format_message(): - singerwriter = SingerWriter() message = singer.RecordMessage( stream="test", record={"id": 1, "name": "test"}, ) - assert singerwriter.format_message(message) == ( + assert singer.format_message(message) == ( '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}' ) def test_write_message(): - singerwriter = SingerWriter() message = singer.RecordMessage( stream="test", record={"id": 1, "name": "test"}, ) with redirect_stdout(io.StringIO()) as out: - singerwriter.write_message(message) + singer.write_message(message) assert out.getvalue() == ( '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' diff --git a/tests/core/test_io.py b/tests/core/test_io.py index 0fcce614b..39644cce7 100644 --- a/tests/core/test_io.py +++ b/tests/core/test_io.py @@ -3,9 +3,10 @@ from __future__ import annotations import decimal +import io import itertools import json -from contextlib import nullcontext +from contextlib import nullcontext, redirect_stdout import pytest @@ -57,6 +58,20 @@ def test_deserialize(line, expected, exception): assert reader.deserialize_json(line) == expected +def test_write_message(): + writer = SingerWriter() + message = RecordMessage( + stream="test", + record={"id": 1, "name": "test"}, + ) + with redirect_stdout(io.StringIO()) as out: + writer.write_message(message) + + assert out.getvalue() == ( + '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' + ) + + # Benchmark Tests From f65e2e0c82c89da3285d68f3b9883e2551d91926 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 19:07:19 -0600 Subject: [PATCH 19/25] Handle uncovered code --- singer_sdk/_singerlib/encoding/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/singer_sdk/_singerlib/encoding/base.py b/singer_sdk/_singerlib/encoding/base.py index 91d8a1c08..48b997084 100644 --- a/singer_sdk/_singerlib/encoding/base.py +++ b/singer_sdk/_singerlib/encoding/base.py @@ -58,11 +58,11 @@ def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: elif record_type == SingerMessageType.STATE: self._process_state_message(line_dict) - elif record_type == SingerMessageType.BATCH: + elif record_type == SingerMessageType.BATCH: # pragma: no cover self._process_batch_message(line_dict) else: - self._process_unknown_message(line_dict) + self._process_unknown_message(line_dict) # pragma: no cover stats[record_type] += 1 @@ -106,7 +106,7 @@ def _process_activate_version_message(self, message_dict: dict) -> None: ... @abc.abstractmethod def _process_batch_message(self, message_dict: dict) -> None: ... - def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 + def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 # pragma: no cover """Internal method to process unknown message types from a Singer tap. Args: From 05d1ac10d3c215d84dba278824299a2ddd92b9f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 19:13:45 -0600 Subject: [PATCH 20/25] Update docstrings --- singer_sdk/helpers/_util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/singer_sdk/helpers/_util.py b/singer_sdk/helpers/_util.py index d4572a5f5..308fd7a30 100644 --- a/singer_sdk/helpers/_util.py +++ b/singer_sdk/helpers/_util.py @@ -21,7 +21,7 @@ def dump_json(obj: t.Any, **kwargs: t.Any) -> str: # noqa: ANN401 A string of serialized json. .. warning:: Do not use this function to serialize Singer messages or bulk data. - Use the functions in ``singer_sdk._singerlib.serde`` instead. + Use the functions in ``singer_sdk._singerlib.json`` instead. """ return simplejson.dumps( obj, @@ -42,7 +42,7 @@ def load_json(json_str: str, **kwargs: t.Any) -> dict: A Python object, usually a dict. .. warning:: Do not use this function to parse Singer messages or bulk data. - Use the functions in ``singer_sdk._singerlib.serde`` instead. + Use the functions in ``singer_sdk._singerlib.json`` instead. """ return simplejson.loads( # type: ignore[no-any-return] json_str, From ac7be3862d63bb0a9734e66089f3c8ced5dff2a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 19:29:28 -0600 Subject: [PATCH 21/25] Move Singer exception catching to reader implementation This way we can catch something like `msgspec.DecodeError` and raise the more generic `InvalidInputLine`. --- singer_sdk/_singerlib/encoding/simple.py | 15 ++++++++++++++- singer_sdk/_singerlib/json.py | 20 +++++--------------- tests/core/test_io.py | 3 ++- 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/singer_sdk/_singerlib/encoding/simple.py b/singer_sdk/_singerlib/encoding/simple.py index 51d92db47..636a8f981 100644 --- a/singer_sdk/_singerlib/encoding/simple.py +++ b/singer_sdk/_singerlib/encoding/simple.py @@ -1,8 +1,11 @@ from __future__ import annotations +import json +import logging import sys import typing as t +from singer_sdk._singerlib.exceptions import InvalidInputLine from singer_sdk._singerlib.json import deserialize_json, serialize_json from .base import GenericSingerReader, GenericSingerWriter @@ -10,6 +13,8 @@ if t.TYPE_CHECKING: from singer_sdk._singerlib.messages import Message +logger = logging.getLogger(__name__) + class SingerReader(GenericSingerReader[str]): """Base class for all plugins reading Singer messages as strings from stdin.""" @@ -24,8 +29,16 @@ def deserialize_json(self, line: str) -> dict: # noqa: PLR6301 Returns: A dictionary of the deserialized json. + + Raises: + InvalidInputLine: If the line is not valid JSON. """ - return deserialize_json(line) + try: + return deserialize_json(line) + except json.decoder.JSONDecodeError as exc: + logger.exception("Unable to parse:\n%s", line) + msg = f"Unable to parse line as JSON: {line}" + raise InvalidInputLine(msg) from exc class SingerWriter(GenericSingerWriter[str]): diff --git a/singer_sdk/_singerlib/json.py b/singer_sdk/_singerlib/json.py index 367238a4a..acb94ad9c 100644 --- a/singer_sdk/_singerlib/json.py +++ b/singer_sdk/_singerlib/json.py @@ -3,13 +3,10 @@ import datetime import decimal import json -import logging import typing as t import simplejson -logger = logging.getLogger(__name__) - __all__ = [ "deserialize_json", "serialize_json", @@ -37,19 +34,12 @@ def deserialize_json(json_str: str | bytes, **kwargs: t.Any) -> dict: Returns: A dictionary of the deserialized json. - - Raises: - json.decoder.JSONDecodeError: raised if any lines are not valid json """ - try: - return json.loads( # type: ignore[no-any-return] - json_str, - parse_float=decimal.Decimal, - **kwargs, - ) - except json.decoder.JSONDecodeError as exc: - logger.exception("Unable to parse:\n%s", json_str, exc_info=exc) - raise + return json.loads( # type: ignore[no-any-return] + json_str, + parse_float=decimal.Decimal, + **kwargs, + ) def serialize_json(obj: object, **kwargs: t.Any) -> str: diff --git a/tests/core/test_io.py b/tests/core/test_io.py index 39644cce7..e68353b12 100644 --- a/tests/core/test_io.py +++ b/tests/core/test_io.py @@ -11,6 +11,7 @@ import pytest from singer_sdk._singerlib import RecordMessage +from singer_sdk._singerlib.exceptions import InvalidInputLine from singer_sdk.io_base import SingerReader, SingerWriter @@ -37,7 +38,7 @@ def _process_state_message(self, message_dict: dict) -> None: pytest.param( "not-valid-json", None, - pytest.raises(json.decoder.JSONDecodeError), + pytest.raises(InvalidInputLine), id="unparsable", ), pytest.param( From 168485cec70412512e8a7544c2090731d6229686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 19:45:50 -0600 Subject: [PATCH 22/25] Increase encoder test coverage --- singer_sdk/_singerlib/encoding/base.py | 6 +++--- tests/core/test_io.py | 24 ++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/singer_sdk/_singerlib/encoding/base.py b/singer_sdk/_singerlib/encoding/base.py index 48b997084..91d8a1c08 100644 --- a/singer_sdk/_singerlib/encoding/base.py +++ b/singer_sdk/_singerlib/encoding/base.py @@ -58,11 +58,11 @@ def _process_lines(self, file_input: t.IO[T]) -> t.Counter[str]: elif record_type == SingerMessageType.STATE: self._process_state_message(line_dict) - elif record_type == SingerMessageType.BATCH: # pragma: no cover + elif record_type == SingerMessageType.BATCH: self._process_batch_message(line_dict) else: - self._process_unknown_message(line_dict) # pragma: no cover + self._process_unknown_message(line_dict) stats[record_type] += 1 @@ -106,7 +106,7 @@ def _process_activate_version_message(self, message_dict: dict) -> None: ... @abc.abstractmethod def _process_batch_message(self, message_dict: dict) -> None: ... - def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 # pragma: no cover + def _process_unknown_message(self, message_dict: dict) -> None: # noqa: PLR6301 """Internal method to process unknown message types from a Singer tap. Args: diff --git a/tests/core/test_io.py b/tests/core/test_io.py index e68353b12..a48a785df 100644 --- a/tests/core/test_io.py +++ b/tests/core/test_io.py @@ -7,6 +7,7 @@ import itertools import json from contextlib import nullcontext, redirect_stdout +from textwrap import dedent import pytest @@ -59,6 +60,29 @@ def test_deserialize(line, expected, exception): assert reader.deserialize_json(line) == expected +def test_listen(): + reader = DummyReader() + input_lines = io.StringIO( + dedent("""\ + {"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} + {"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}} + {"type": "RECORD", "stream": "users", "record": {"id": 2, "value": 2.34}} + {"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}}}} + {"type": "SCHEMA", "stream": "batches", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} + {"type": "BATCH", "stream": "batches", "encoding": {"format": "jsonl", "compression": "gzip"}, "manifest": ["file1.jsonl.gz", "file2.jsonl.gz"]} + {"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}, "batches": {"id": 1000000}}}} + """) # noqa: E501 + ) + reader.listen(input_lines) + + +def test_listen_unknown_message(): + reader = DummyReader() + input_lines = io.StringIO('{"type": "UNKNOWN"}\n') + with pytest.raises(ValueError, match="Unknown message type"): + reader.listen(input_lines) + + def test_write_message(): writer = SingerWriter() message = RecordMessage( From 8f390f63ac6b14593ff6ed069dd99563be3fcfa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 20:38:07 -0600 Subject: [PATCH 23/25] Re-use message writing and formatting logic --- singer_sdk/_singerlib/encoding/base.py | 199 +++++++++++++++++++- singer_sdk/_singerlib/messages.py | 249 +++---------------------- 2 files changed, 224 insertions(+), 224 deletions(-) diff --git a/singer_sdk/_singerlib/encoding/base.py b/singer_sdk/_singerlib/encoding/base.py index 91d8a1c08..798a4f6dd 100644 --- a/singer_sdk/_singerlib/encoding/base.py +++ b/singer_sdk/_singerlib/encoding/base.py @@ -3,12 +3,20 @@ from __future__ import annotations import abc +import enum import logging +import sys import typing as t from collections import Counter, defaultdict +from dataclasses import asdict, dataclass, field +from datetime import datetime, timezone from singer_sdk._singerlib import exceptions -from singer_sdk._singerlib.messages import Message, SingerMessageType + +if sys.version_info < (3, 11): + from backports.datetime_fromisoformat import MonkeyPatch + + MonkeyPatch.patch_fromisoformat() logger = logging.getLogger(__name__) @@ -18,6 +26,195 @@ T = t.TypeVar("T", str, bytes) +class SingerMessageType(str, enum.Enum): + """Singer specification message types.""" + + RECORD = "RECORD" + SCHEMA = "SCHEMA" + STATE = "STATE" + ACTIVATE_VERSION = "ACTIVATE_VERSION" + BATCH = "BATCH" + + +def exclude_null_dict(pairs: list[tuple[str, t.Any]]) -> dict[str, t.Any]: + """Exclude null values from a dictionary. + + Args: + pairs: The dictionary key-value pairs. + + Returns: + The filtered key-value pairs. + """ + return {key: value for key, value in pairs if value is not None} + + +@dataclass +class Message: + """Singer base message.""" + + type: SingerMessageType = field(init=False) + """The message type.""" + + def to_dict(self) -> dict[str, t.Any]: + """Return a dictionary representation of the message. + + Returns: + A dictionary with the defined message fields. + """ + return asdict(self, dict_factory=exclude_null_dict) + + @classmethod + def from_dict( + cls: t.Type[Message], # noqa: UP006 + data: dict[str, t.Any], + ) -> Message: + """Create an encoding from a dictionary. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + data.pop("type") + return cls(**data) + + +@dataclass +class RecordMessage(Message): + """Singer record message.""" + + stream: str + """The stream name.""" + + record: dict[str, t.Any] + """The record data.""" + + version: int | None = None + """The record version.""" + + time_extracted: datetime | None = None + """The time the record was extracted.""" + + @classmethod + def from_dict(cls: type[RecordMessage], data: dict[str, t.Any]) -> RecordMessage: + """Create a record message from a dictionary. + + This overrides the default conversion logic, since it uses unnecessary + deep copying and is very slow. + + Args: + data: The dictionary to create the message from. + + Returns: + The created message. + """ + time_extracted = data.get("time_extracted") + return cls( + stream=data["stream"], + record=data["record"], + version=data.get("version"), + time_extracted=datetime.fromisoformat(time_extracted) + if time_extracted + else None, + ) + + def to_dict(self) -> dict[str, t.Any]: + """Return a dictionary representation of the message. + + This overrides the default conversion logic, since it uses unnecessary + deep copying and is very slow. + + Returns: + A dictionary with the defined message fields. + """ + result: dict[str, t.Any] = { + "type": "RECORD", + "stream": self.stream, + "record": self.record, + } + if self.version is not None: + result["version"] = self.version + if self.time_extracted is not None: + result["time_extracted"] = self.time_extracted + return result + + def __post_init__(self) -> None: + """Post-init processing. + + Raises: + ValueError: If the time_extracted is not timezone-aware. + """ + self.type = SingerMessageType.RECORD + if self.time_extracted and not self.time_extracted.tzinfo: + msg = ( + "'time_extracted' must be either None or an aware datetime (with a " + "time zone)" + ) + raise ValueError(msg) + + if self.time_extracted: + self.time_extracted = self.time_extracted.astimezone(timezone.utc) + + +@dataclass +class SchemaMessage(Message): + """Singer schema message.""" + + stream: str + """The stream name.""" + + schema: dict[str, t.Any] + """The schema definition.""" + + key_properties: t.Sequence[str] | None = None + """The key properties.""" + + bookmark_properties: list[str] | None = None + """The bookmark properties.""" + + def __post_init__(self) -> None: + """Post-init processing. + + Raises: + ValueError: If bookmark_properties is not a string or list of strings. + """ + self.type = SingerMessageType.SCHEMA + + if isinstance(self.bookmark_properties, (str, bytes)): + self.bookmark_properties = [self.bookmark_properties] + if self.bookmark_properties and not isinstance(self.bookmark_properties, list): + msg = "bookmark_properties must be a string or list of strings" + raise ValueError(msg) + + +@dataclass +class StateMessage(Message): + """Singer state message.""" + + value: dict[str, t.Any] + """The state value.""" + + def __post_init__(self) -> None: + """Post-init processing.""" + self.type = SingerMessageType.STATE + + +@dataclass +class ActivateVersionMessage(Message): + """Singer activate version message.""" + + stream: str + """The stream name.""" + + version: int + """The version to activate.""" + + def __post_init__(self) -> None: + """Post-init processing.""" + self.type = SingerMessageType.ACTIVATE_VERSION + + class GenericSingerReader(t.Generic[T], metaclass=abc.ABCMeta): """Interface for all plugins reading Singer messages as strings or bytes.""" diff --git a/singer_sdk/_singerlib/messages.py b/singer_sdk/_singerlib/messages.py index ae8977c9e..ae5572c4c 100644 --- a/singer_sdk/_singerlib/messages.py +++ b/singer_sdk/_singerlib/messages.py @@ -2,226 +2,29 @@ from __future__ import annotations -import enum -import sys -import typing as t -from dataclasses import asdict, dataclass, field -from datetime import datetime, timezone - -from singer_sdk._singerlib.json import serialize_json - -if sys.version_info < (3, 11): - from backports.datetime_fromisoformat import MonkeyPatch - - MonkeyPatch.patch_fromisoformat() - - -class SingerMessageType(str, enum.Enum): - """Singer specification message types.""" - - RECORD = "RECORD" - SCHEMA = "SCHEMA" - STATE = "STATE" - ACTIVATE_VERSION = "ACTIVATE_VERSION" - BATCH = "BATCH" - - -def exclude_null_dict(pairs: list[tuple[str, t.Any]]) -> dict[str, t.Any]: - """Exclude null values from a dictionary. - - Args: - pairs: The dictionary key-value pairs. - - Returns: - The filtered key-value pairs. - """ - return {key: value for key, value in pairs if value is not None} - - -@dataclass -class Message: - """Singer base message.""" - - type: SingerMessageType = field(init=False) - """The message type.""" - - def to_dict(self) -> dict[str, t.Any]: - """Return a dictionary representation of the message. - - Returns: - A dictionary with the defined message fields. - """ - return asdict(self, dict_factory=exclude_null_dict) - - @classmethod - def from_dict( - cls: t.Type[Message], # noqa: UP006 - data: dict[str, t.Any], - ) -> Message: - """Create an encoding from a dictionary. - - Args: - data: The dictionary to create the message from. - - Returns: - The created message. - """ - data.pop("type") - return cls(**data) - - -@dataclass -class RecordMessage(Message): - """Singer record message.""" - - stream: str - """The stream name.""" - - record: dict[str, t.Any] - """The record data.""" - - version: int | None = None - """The record version.""" - - time_extracted: datetime | None = None - """The time the record was extracted.""" - - @classmethod - def from_dict(cls: type[RecordMessage], data: dict[str, t.Any]) -> RecordMessage: - """Create a record message from a dictionary. - - This overrides the default conversion logic, since it uses unnecessary - deep copying and is very slow. - - Args: - data: The dictionary to create the message from. - - Returns: - The created message. - """ - time_extracted = data.get("time_extracted") - return cls( - stream=data["stream"], - record=data["record"], - version=data.get("version"), - time_extracted=datetime.fromisoformat(time_extracted) - if time_extracted - else None, - ) - - def to_dict(self) -> dict[str, t.Any]: - """Return a dictionary representation of the message. - - This overrides the default conversion logic, since it uses unnecessary - deep copying and is very slow. - - Returns: - A dictionary with the defined message fields. - """ - result: dict[str, t.Any] = { - "type": "RECORD", - "stream": self.stream, - "record": self.record, - } - if self.version is not None: - result["version"] = self.version - if self.time_extracted is not None: - result["time_extracted"] = self.time_extracted - return result - - def __post_init__(self) -> None: - """Post-init processing. - - Raises: - ValueError: If the time_extracted is not timezone-aware. - """ - self.type = SingerMessageType.RECORD - if self.time_extracted and not self.time_extracted.tzinfo: - msg = ( - "'time_extracted' must be either None or an aware datetime (with a " - "time zone)" - ) - raise ValueError(msg) - - if self.time_extracted: - self.time_extracted = self.time_extracted.astimezone(timezone.utc) - - -@dataclass -class SchemaMessage(Message): - """Singer schema message.""" - - stream: str - """The stream name.""" - - schema: dict[str, t.Any] - """The schema definition.""" - - key_properties: t.Sequence[str] | None = None - """The key properties.""" - - bookmark_properties: list[str] | None = None - """The bookmark properties.""" - - def __post_init__(self) -> None: - """Post-init processing. - - Raises: - ValueError: If bookmark_properties is not a string or list of strings. - """ - self.type = SingerMessageType.SCHEMA - - if isinstance(self.bookmark_properties, (str, bytes)): - self.bookmark_properties = [self.bookmark_properties] - if self.bookmark_properties and not isinstance(self.bookmark_properties, list): - msg = "bookmark_properties must be a string or list of strings" - raise ValueError(msg) - - -@dataclass -class StateMessage(Message): - """Singer state message.""" - - value: dict[str, t.Any] - """The state value.""" - - def __post_init__(self) -> None: - """Post-init processing.""" - self.type = SingerMessageType.STATE - - -@dataclass -class ActivateVersionMessage(Message): - """Singer activate version message.""" - - stream: str - """The stream name.""" - - version: int - """The version to activate.""" - - def __post_init__(self) -> None: - """Post-init processing.""" - self.type = SingerMessageType.ACTIVATE_VERSION - - -def format_message(message: Message) -> str: - """Format a message as a JSON string. - - Args: - message: The message to format. - - Returns: - The formatted message. - """ - return serialize_json(message.to_dict()) - - -def write_message(message: Message) -> None: - """Write a message to stdout. - - Args: - message: The message to write. - """ - sys.stdout.write(format_message(message) + "\n") - sys.stdout.flush() +from singer_sdk._singerlib.encoding import SingerWriter +from singer_sdk._singerlib.encoding.base import ( + ActivateVersionMessage, + Message, + RecordMessage, + SchemaMessage, + SingerMessageType, + StateMessage, + exclude_null_dict, +) + +__all__ = [ + "ActivateVersionMessage", + "Message", + "RecordMessage", + "SchemaMessage", + "SingerMessageType", + "StateMessage", + "exclude_null_dict", + "format_message", + "write_message", +] + +WRITER = SingerWriter() +format_message = WRITER.format_message +write_message = WRITER.write_message From a5f67a538d9a27d36409771ebcb17b22227d72d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Fri, 12 Jul 2024 21:07:52 -0600 Subject: [PATCH 24/25] Test records with version --- tests/_singerlib/test_messages.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/_singerlib/test_messages.py b/tests/_singerlib/test_messages.py index b433e9610..f9cd73bbe 100644 --- a/tests/_singerlib/test_messages.py +++ b/tests/_singerlib/test_messages.py @@ -93,6 +93,21 @@ def test_record_message_time_extracted_to_utc(): assert record.time_extracted == datetime.datetime(2021, 1, 1, 9, tzinfo=UTC) +def test_record_message_with_version(): + record = singer.RecordMessage( + stream="test", + record={"id": 1, "name": "test"}, + version=1614556800, + ) + assert record.version == 1614556800 + assert record.to_dict() == { + "type": "RECORD", + "stream": "test", + "record": {"id": 1, "name": "test"}, + "version": 1614556800, + } + + def test_schema_message(): schema = singer.SchemaMessage( stream="test", From f848370970ed0de37fa5ea094ac9f372d2b7fa91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 16 Jul 2024 17:30:36 -0600 Subject: [PATCH 25/25] Move encodings to a private submodule --- singer_sdk/_singerlib/{encoding => _encoding}/__init__.py | 0 singer_sdk/_singerlib/{encoding => _encoding}/base.py | 0 singer_sdk/_singerlib/{encoding => _encoding}/simple.py | 0 singer_sdk/_singerlib/messages.py | 4 ++-- singer_sdk/io_base.py | 2 +- 5 files changed, 3 insertions(+), 3 deletions(-) rename singer_sdk/_singerlib/{encoding => _encoding}/__init__.py (100%) rename singer_sdk/_singerlib/{encoding => _encoding}/base.py (100%) rename singer_sdk/_singerlib/{encoding => _encoding}/simple.py (100%) diff --git a/singer_sdk/_singerlib/encoding/__init__.py b/singer_sdk/_singerlib/_encoding/__init__.py similarity index 100% rename from singer_sdk/_singerlib/encoding/__init__.py rename to singer_sdk/_singerlib/_encoding/__init__.py diff --git a/singer_sdk/_singerlib/encoding/base.py b/singer_sdk/_singerlib/_encoding/base.py similarity index 100% rename from singer_sdk/_singerlib/encoding/base.py rename to singer_sdk/_singerlib/_encoding/base.py diff --git a/singer_sdk/_singerlib/encoding/simple.py b/singer_sdk/_singerlib/_encoding/simple.py similarity index 100% rename from singer_sdk/_singerlib/encoding/simple.py rename to singer_sdk/_singerlib/_encoding/simple.py diff --git a/singer_sdk/_singerlib/messages.py b/singer_sdk/_singerlib/messages.py index ae5572c4c..b08739126 100644 --- a/singer_sdk/_singerlib/messages.py +++ b/singer_sdk/_singerlib/messages.py @@ -2,8 +2,8 @@ from __future__ import annotations -from singer_sdk._singerlib.encoding import SingerWriter -from singer_sdk._singerlib.encoding.base import ( +from singer_sdk._singerlib._encoding import SingerWriter +from singer_sdk._singerlib._encoding.base import ( ActivateVersionMessage, Message, RecordMessage, diff --git a/singer_sdk/io_base.py b/singer_sdk/io_base.py index f9041beea..ddc933eb8 100644 --- a/singer_sdk/io_base.py +++ b/singer_sdk/io_base.py @@ -2,4 +2,4 @@ from __future__ import annotations -from singer_sdk._singerlib.encoding import * # noqa: F403 +from singer_sdk._singerlib._encoding import * # noqa: F403