diff --git a/singer_sdk/_singerlib/encoding/_msgspec.py b/singer_sdk/_singerlib/encoding/_msgspec.py index 74178c597..1cbb6a453 100644 --- a/singer_sdk/_singerlib/encoding/_msgspec.py +++ b/singer_sdk/_singerlib/encoding/_msgspec.py @@ -91,4 +91,4 @@ def write_message(self, message: Message) -> None: message: The message to write. """ sys.stdout.buffer.write(self.format_message(message) + b"\n") - sys.stdout.buffer.flush() + sys.stdout.flush() diff --git a/tests/_singerlib/_encoding/test_msgspec.py b/tests/_singerlib/_encoding/test_msgspec.py deleted file mode 100644 index 3fca4278c..000000000 --- a/tests/_singerlib/_encoding/test_msgspec.py +++ /dev/null @@ -1,41 +0,0 @@ -from __future__ import annotations # noqa: INP001 - -import pytest - -from singer_sdk._singerlib.encoding._msgspec import dec_hook, enc_hook - - -@pytest.mark.parametrize( - "test_type,test_value,expected_value,expected_type", - [ - pytest.param( - int, - 1, - "1", - str, - id="int-to-str", - ), - ], -) -def test_dec_hook(test_type, test_value, expected_value, expected_type): - returned = dec_hook(type=test_type, obj=test_value) - returned_type = type(returned) - - assert returned == expected_value - assert returned_type == expected_type - - -@pytest.mark.parametrize( - "test_value,expected_value", - [ - pytest.param( - 1, - "1", - id="int-to-str", - ), - ], -) -def test_enc_hook(test_value, expected_value): - returned = enc_hook(obj=test_value) - - assert returned == expected_value diff --git a/tests/_singerlib/encoding/conftest.py b/tests/_singerlib/encoding/conftest.py new file mode 100644 index 000000000..82a265b1d --- /dev/null +++ b/tests/_singerlib/encoding/conftest.py @@ -0,0 +1,35 @@ +from __future__ import annotations # noqa: INP001 + +import json + +import pytest + +from singer_sdk._singerlib import RecordMessage + + +@pytest.fixture +def bench_record(): + return { + "stream": "users", + "type": "RECORD", + "record": { + "Id": 1, + "created_at": "2021-01-01T00:08:00-07:00", + "updated_at": "2022-01-02T00:09:00-07:00", + "deleted_at": "2023-01-03T00:10:00-07:00", + "value": 1.23, + "RelatedId": 32412, + "TypeId": 1, + }, + "time_extracted": "2023-01-01T11:00:00.00000-07:00", + } + + +@pytest.fixture +def bench_record_message(bench_record): + return RecordMessage.from_dict(bench_record) + + +@pytest.fixture +def bench_encoded_record(bench_record): + return json.dumps(bench_record) diff --git a/tests/_singerlib/encoding/test_msgspec.py b/tests/_singerlib/encoding/test_msgspec.py new file mode 100644 index 000000000..8a0d3ea94 --- /dev/null +++ b/tests/_singerlib/encoding/test_msgspec.py @@ -0,0 +1,167 @@ +"""Test IO operations for msgspec Singer reader and writer.""" # noqa: INP001 + +from __future__ import annotations + +import decimal +import io +import itertools +from contextlib import nullcontext, redirect_stdout +from textwrap import dedent + +import pytest + +from singer_sdk._singerlib import RecordMessage +from singer_sdk._singerlib.encoding._msgspec import ( + MsgSpecReader, + MsgSpecWriter, + dec_hook, + enc_hook, +) +from singer_sdk._singerlib.exceptions import InvalidInputLine + + +@pytest.mark.parametrize( + "test_type,test_value,expected_value,expected_type", + [ + pytest.param( + int, + 1, + "1", + str, + id="int-to-str", + ), + ], +) +def test_dec_hook(test_type, test_value, expected_value, expected_type): + returned = dec_hook(type=test_type, obj=test_value) + returned_type = type(returned) + + assert returned == expected_value + assert returned_type == expected_type + + +@pytest.mark.parametrize( + "test_value,expected_value", + [ + pytest.param( + 1, + "1", + id="int-to-str", + ), + ], +) +def test_enc_hook(test_value, expected_value): + returned = enc_hook(obj=test_value) + + assert returned == expected_value + + +class DummyReader(MsgSpecReader): + def _process_activate_version_message(self, message_dict: dict) -> None: + pass + + def _process_batch_message(self, message_dict: dict) -> None: + pass + + def _process_record_message(self, message_dict: dict) -> None: + pass + + def _process_schema_message(self, message_dict: dict) -> None: + pass + + def _process_state_message(self, message_dict: dict) -> None: + pass + + +@pytest.mark.parametrize( + "line,expected,exception", + [ + pytest.param( + "not-valid-json", + None, + pytest.raises(InvalidInputLine), + id="unparsable", + ), + pytest.param( + '{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}}', + { + "type": "RECORD", + "stream": "users", + "record": {"id": 1, "value": decimal.Decimal("1.23")}, + }, + nullcontext(), + id="record", + ), + ], +) +def test_deserialize(line, expected, exception): + reader = DummyReader() + with exception: + assert reader.deserialize_json(line) == expected + + +def test_listen(): + reader = DummyReader() + input_lines = io.StringIO( + dedent("""\ + {"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} + {"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}} + {"type": "RECORD", "stream": "users", "record": {"id": 2, "value": 2.34}} + {"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}}}} + {"type": "SCHEMA", "stream": "batches", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} + {"type": "BATCH", "stream": "batches", "encoding": {"format": "jsonl", "compression": "gzip"}, "manifest": ["file1.jsonl.gz", "file2.jsonl.gz"]} + {"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}, "batches": {"id": 1000000}}}} + """) # noqa: E501 + ) + reader.listen(input_lines) + + +def test_listen_unknown_message(): + reader = DummyReader() + input_lines = io.StringIO('{"type": "UNKNOWN"}\n') + with pytest.raises(ValueError, match="Unknown message type"): + reader.listen(input_lines) + + +def test_write_message(): + writer = MsgSpecWriter() + message = RecordMessage( + stream="test", + record={"id": 1, "name": "test"}, + ) + with redirect_stdout(io.TextIOWrapper(io.BytesIO())) as out: # noqa: PLW1514 + writer.write_message(message) + + out.seek(0) + assert out.read() == ( + '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' + ) + + +# Benchmark Tests + + +def test_bench_format_message(benchmark, bench_record_message: RecordMessage): + """Run benchmark for Sink._validator method validate.""" + number_of_runs = 1000 + + writer = MsgSpecWriter() + + def run_format_message(): + for record in itertools.repeat(bench_record_message, number_of_runs): + writer.format_message(record) + + benchmark(run_format_message) + + +def test_bench_deserialize_json(benchmark, bench_encoded_record: str): + """Run benchmark for Sink._validator method validate.""" + number_of_runs = 1000 + + reader = DummyReader() + + def run_deserialize_json(): + for record in itertools.repeat(bench_encoded_record, number_of_runs): + reader.deserialize_json(record) + + benchmark(run_deserialize_json) diff --git a/tests/_singerlib/encoding/test_simple.py b/tests/_singerlib/encoding/test_simple.py new file mode 100644 index 000000000..a0dfb7d14 --- /dev/null +++ b/tests/_singerlib/encoding/test_simple.py @@ -0,0 +1,125 @@ +"""Test IO operations for simple Singer reader and writer.""" # noqa: INP001 + +from __future__ import annotations + +import decimal +import io +import itertools +from contextlib import nullcontext, redirect_stdout +from textwrap import dedent + +import pytest + +from singer_sdk._singerlib import RecordMessage +from singer_sdk._singerlib.encoding._simple import SingerReader, SingerWriter +from singer_sdk._singerlib.exceptions import InvalidInputLine + + +class DummyReader(SingerReader): + def _process_activate_version_message(self, message_dict: dict) -> None: + pass + + def _process_batch_message(self, message_dict: dict) -> None: + pass + + def _process_record_message(self, message_dict: dict) -> None: + pass + + def _process_schema_message(self, message_dict: dict) -> None: + pass + + def _process_state_message(self, message_dict: dict) -> None: + pass + + +@pytest.mark.parametrize( + "line,expected,exception", + [ + pytest.param( + "not-valid-json", + None, + pytest.raises(InvalidInputLine), + id="unparsable", + ), + pytest.param( + '{"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}}', + { + "type": "RECORD", + "stream": "users", + "record": {"id": 1, "value": decimal.Decimal("1.23")}, + }, + nullcontext(), + id="record", + ), + ], +) +def test_deserialize(line, expected, exception): + reader = DummyReader() + with exception: + assert reader.deserialize_json(line) == expected + + +def test_listen(): + reader = DummyReader() + input_lines = io.StringIO( + dedent("""\ + {"type": "SCHEMA", "stream": "users", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} + {"type": "RECORD", "stream": "users", "record": {"id": 1, "value": 1.23}} + {"type": "RECORD", "stream": "users", "record": {"id": 2, "value": 2.34}} + {"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}}}} + {"type": "SCHEMA", "stream": "batches", "schema": {"type": "object", "properties": {"id": {"type": "integer"}, "value": {"type": "number"}}}} + {"type": "BATCH", "stream": "batches", "encoding": {"format": "jsonl", "compression": "gzip"}, "manifest": ["file1.jsonl.gz", "file2.jsonl.gz"]} + {"type": "STATE", "value": {"bookmarks": {"users": {"id": 2}, "batches": {"id": 1000000}}}} + """) # noqa: E501 + ) + reader.listen(input_lines) + + +def test_listen_unknown_message(): + reader = DummyReader() + input_lines = io.StringIO('{"type": "UNKNOWN"}\n') + with pytest.raises(ValueError, match="Unknown message type"): + reader.listen(input_lines) + + +def test_write_message(): + writer = SingerWriter() + message = RecordMessage( + stream="test", + record={"id": 1, "name": "test"}, + ) + with redirect_stdout(io.StringIO()) as out: + writer.write_message(message) + + assert out.getvalue() == ( + '{"type":"RECORD","stream":"test","record":{"id":1,"name":"test"}}\n' + ) + + +# Benchmark Tests + + +def test_bench_format_message(benchmark, bench_record_message: RecordMessage): + """Run benchmark for Sink._validator method validate.""" + number_of_runs = 1000 + + writer = SingerWriter() + + def run_format_message(): + for record in itertools.repeat(bench_record_message, number_of_runs): + writer.format_message(record) + + benchmark(run_format_message) + + +def test_bench_deserialize_json(benchmark, bench_encoded_record: str): + """Run benchmark for Sink._validator method validate.""" + number_of_runs = 1000 + + reader = DummyReader() + + def run_deserialize_json(): + for record in itertools.repeat(bench_encoded_record, number_of_runs): + reader.deserialize_json(record) + + benchmark(run_deserialize_json)