diff --git a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md index 5166e0fc1dc05..e7d36c2769c21 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md +++ b/airbyte-integrations/bases/connector-acceptance-test/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 3.6.0 +Relaxing CATs validation when a stream has a primary key defined. + ## 3.5.0 Add `validate_stream_statuses` to TestBasicRead.test_read:: Validate all statuses for all streams in the catalogs were emitted in correct order. diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py index 29e1fc4e39fd5..ce2df96dc858d 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/tests/test_core.py @@ -12,7 +12,7 @@ from os.path import splitext from pathlib import Path from threading import Thread -from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Set, Tuple +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple from xmlrpc.client import Boolean import connector_acceptance_test.utils.docs as docs_utils @@ -21,6 +21,7 @@ import pytest import requests from airbyte_protocol.models import ( + AirbyteMessage, AirbyteRecordMessage, AirbyteStream, AirbyteStreamStatus, @@ -838,14 +839,23 @@ def primary_keys_for_records(streams, records): for stream in streams_with_primary_key: stream_records = [r for r in records if r.stream == stream.stream.name] for stream_record in stream_records: - pk_values = {} - for pk_path in stream.stream.source_defined_primary_key: - pk_value = reduce(lambda data, key: data.get(key) if isinstance(data, dict) else None, pk_path, stream_record.data) - pk_values[tuple(pk_path)] = pk_value - + pk_values = _extract_primary_key_value(stream_record.data, stream.stream.source_defined_primary_key) yield pk_values, stream_record +def _extract_pk_values(records: Iterable[Mapping[str, Any]], primary_key: List[List[str]]) -> Iterable[dict[Tuple[str], Any]]: + for record in records: + yield _extract_primary_key_value(record, primary_key) + + +def _extract_primary_key_value(record: Mapping[str, Any], primary_key: List[List[str]]) -> dict[Tuple[str], Any]: + pk_values = {} + for pk_path in primary_key: + pk_value: Any = reduce(lambda data, key: data.get(key) if isinstance(data, dict) else None, pk_path, record) + pk_values[tuple(pk_path)] = pk_value + return pk_values + + @pytest.mark.default_timeout(TEN_MINUTES) class TestBasicRead(BaseTest): @staticmethod @@ -953,6 +963,7 @@ def _validate_expected_records( flags, ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]], detailed_logger: Logger, + configured_catalog: ConfiguredAirbyteCatalog, ): """ We expect some records from stream to match expected_records, partially or fully, in exact or any order. @@ -972,6 +983,7 @@ def _validate_expected_records( extra_records=flags.extra_records, ignored_fields=ignored_field_names, detailed_logger=detailed_logger, + configured_catalog=configured_catalog, ) @pytest.fixture(name="should_validate_schema") @@ -1081,6 +1093,7 @@ async def test_read( flags=expect_records_config, ignored_fields=ignored_fields, detailed_logger=detailed_logger, + configured_catalog=configured_catalog, ) if should_validate_stream_statuses: @@ -1142,8 +1155,45 @@ def compare_records( extra_records: bool, ignored_fields: List[str], detailed_logger: Logger, + configured_catalog: ConfiguredAirbyteCatalog, ): """Compare records using combination of restrictions""" + configured_streams = [stream for stream in configured_catalog.streams if stream.stream.name == stream_name] + if len(configured_streams) != 1: + raise ValueError(f"Expected exactly one stream matching name {stream_name} but got {len(configured_streams)}") + + configured_stream = configured_streams[0] + if configured_stream.stream.source_defined_primary_key: + # as part of the migration for relaxing CATs, we are starting only with the streams that defines primary keys + expected_primary_keys = list(_extract_pk_values(expected, configured_stream.stream.source_defined_primary_key)) + actual_primary_keys = list(_extract_pk_values(actual, configured_stream.stream.source_defined_primary_key)) + if exact_order: + assert ( + actual_primary_keys[: len(expected_primary_keys)] == expected_primary_keys + ), f"Expected to see those primary keys in order in the actual response for stream {stream_name}." + else: + expected_but_not_found = set(map(make_hashable, expected_primary_keys)).difference( + set(map(make_hashable, actual_primary_keys)) + ) + assert ( + not expected_but_not_found + ), f"Expected to see those primary keys in the actual response for stream {stream_name} but they were not found." + else: + TestBasicRead.legacy_compare_records( + stream_name, actual, expected, extra_fields, exact_order, extra_records, ignored_fields, detailed_logger + ) + + @staticmethod + def legacy_compare_records( + stream_name: str, + actual: List[Mapping[str, Any]], + expected: List[Mapping[str, Any]], + extra_fields: bool, + exact_order: bool, + extra_records: bool, + ignored_fields: List[str], + detailed_logger: Logger, + ): if exact_order: if ignored_fields: for item in actual: diff --git a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/utils/asserts.py b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/utils/asserts.py index 8362aac100537..36a3e01c11588 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/utils/asserts.py +++ b/airbyte-integrations/bases/connector-acceptance-test/connector_acceptance_test/utils/asserts.py @@ -88,7 +88,10 @@ def verify_records_schema( stream_validators = {} for stream in catalog.streams: schema_to_validate_against = stream.stream.json_schema - validator = NoAdditionalPropertiesValidator if fail_on_extra_columns else Draft7ValidatorWithStrictInteger + # We will be disabling strict `NoAdditionalPropertiesValidator` until we have a better plan for schema validation. The consequence + # is that we will lack visibility on new fields that are not added on the root level (root level is validated by Datadog) + # validator = NoAdditionalPropertiesValidator if fail_on_extra_columns else Draft7ValidatorWithStrictInteger + validator = Draft7ValidatorWithStrictInteger stream_validators[stream.stream.name] = validator(schema_to_validate_against, format_checker=CustomFormatChecker()) stream_errors = defaultdict(dict) for record in records: diff --git a/airbyte-integrations/bases/connector-acceptance-test/poetry.lock b/airbyte-integrations/bases/connector-acceptance-test/poetry.lock index fff2f1561b59f..508501085564a 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/poetry.lock +++ b/airbyte-integrations/bases/connector-acceptance-test/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "airbyte-protocol-models" @@ -1226,6 +1226,7 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, + {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a08c6f0fe150303c1c6b71ebcd7213c2858041a7e01975da3a99aed1e7a378ef"}, {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, @@ -1687,4 +1688,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "1c468b66c56cfccd5e5bff7d9c69f01c729d828132a8a56a7089447f5da0f534" +content-hash = "9d53af4fe5cca16b6ce5a61f3f7d286b561af9920f77163e00e4e59eacc9e4f6" diff --git a/airbyte-integrations/bases/connector-acceptance-test/pyproject.toml b/airbyte-integrations/bases/connector-acceptance-test/pyproject.toml index ed80ea830fd8b..41522609d0cbd 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/pyproject.toml +++ b/airbyte-integrations/bases/connector-acceptance-test/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "connector-acceptance-test" -version = "3.5.0" +version = "3.6.0" description = "Contains acceptance tests for connectors." authors = ["Airbyte "] license = "MIT" diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_asserts.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_asserts.py index d0732458ec4da..0b78b93148c6f 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_asserts.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_asserts.py @@ -87,57 +87,6 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog): ] -@pytest.mark.parametrize( - "json_schema, record, should_fail", - [ - ({"type": "object", "properties": {"a": {"type": "string"}}}, {"a": "str", "b": "extra_string"}, True), - ( - {"type": "object", "properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"]}}}, - {"a": "str", "some_obj": {"b": "extra_string"}}, - False, - ), - ( - { - "type": "object", - "properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"], "properties": {"a": {"type": "string"}}}}, - }, - {"a": "str", "some_obj": {"a": "str", "b": "extra_string"}}, - True, - ), - ( - {"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "array", "items": {"type": "object"}}}}, - {"a": "str", "b": [{"a": "extra_string"}]}, - False, - ), - ( - { - "type": "object", - "properties": { - "a": {"type": "string"}, - "b": {"type": "array", "items": {"type": "object", "properties": {"a": {"type": "string"}}}}, - }, - }, - {"a": "str", "b": [{"a": "string", "b": "extra_string"}]}, - True, - ), - ], - ids=[ - "simple_schema_and_record_with_extra_property", - "schema_with_object_without_properties_and_record_with_object_with_property", - "schema_with_object_with_properties_and_record_with_object_with_extra_property", - "schema_with_array_of_objects_without_properties_and_record_with_array_of_objects_with_property", - "schema_with_array_of_objects_with_properties_and_record_with_array_of_objects_with_extra_property", - ], -) -def test_verify_records_schema_with_fail_on_extra_columns(configured_catalog: ConfiguredAirbyteCatalog, json_schema, record, should_fail): - """Test that fail_on_extra_columns works correctly with nested objects, array of objects""" - configured_catalog.streams[0].stream.json_schema = json_schema - records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0)] - streams_with_errors = verify_records_schema(records, configured_catalog, fail_on_extra_columns=True) - errors = [error.message for error in streams_with_errors["my_stream"].values()] - assert errors if should_fail else not errors - - @pytest.mark.parametrize( "record, configured_catalog, valid", [ diff --git a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py index 3e5dbda69f3ea..fd7f8b020a0b3 100644 --- a/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py +++ b/airbyte-integrations/bases/connector-acceptance-test/unit_tests/test_core.py @@ -592,60 +592,69 @@ def test_configured_catalog_fixture(mocker, test_strictness_level, configured_ca assert configured_catalog == test_core.build_configured_catalog_from_custom_catalog.return_value +_DEFAULT_RECORD_CONFIG = ExpectedRecordsConfig(path="foobar") + + @pytest.mark.parametrize( - "schema, ignored_fields, expect_records_config, record, expected_records_by_stream, expectation", + "schema, ignored_fields, expect_records_config, record, expected_records_by_stream, primary_key, expectation", [ - ({"type": "object"}, {}, ExpectedRecordsConfig(path="foobar"), {"aa": 23}, {}, does_not_raise()), - ({"type": "object"}, {}, ExpectedRecordsConfig(path="foobar"), {}, {}, does_not_raise()), + ({"type": "object"}, {}, _DEFAULT_RECORD_CONFIG, {"aa": 23}, {}, None, does_not_raise()), + ({"type": "object"}, {}, _DEFAULT_RECORD_CONFIG, {}, {}, None, does_not_raise()), ( {"type": "object", "properties": {"created": {"type": "string"}}}, {}, - ExpectedRecordsConfig(path="foobar"), + _DEFAULT_RECORD_CONFIG, {"aa": 23}, {}, + None, pytest.raises(AssertionError, match="should have some fields mentioned by json schema"), ), ( {"type": "object", "properties": {"created": {"type": "string"}}}, {}, - ExpectedRecordsConfig(path="foobar"), + _DEFAULT_RECORD_CONFIG, {"created": "23"}, {}, + None, does_not_raise(), ), ( {"type": "object", "properties": {"created": {"type": "string"}}}, {}, - ExpectedRecordsConfig(path="foobar"), + _DEFAULT_RECORD_CONFIG, {"root": {"created": "23"}}, {}, + None, pytest.raises(AssertionError, match="should have some fields mentioned by json schema"), ), # Recharge shop stream case ( {"type": "object", "properties": {"shop": {"type": ["null", "object"]}, "store": {"type": ["null", "object"]}}}, {}, - ExpectedRecordsConfig(path="foobar"), + _DEFAULT_RECORD_CONFIG, {"shop": {"a": "23"}, "store": {"b": "23"}}, {}, + None, does_not_raise(), ), # Fail when expected and actual records are not equal ( {"type": "object"}, {}, - ExpectedRecordsConfig(path="foobar"), + _DEFAULT_RECORD_CONFIG, {"constant_field": "must equal", "fast_changing_field": [{"field": 2}]}, {"test_stream": [{"constant_field": "must equal", "fast_changing_field": [{"field": 1}]}]}, + None, pytest.raises(Failed, match="Stream test_stream: All expected records must be produced"), ), # Expected and Actual records are not equal but we ignore fast changing field ( {"type": "object"}, {"test_stream": [IgnoredFieldsConfiguration(name="fast_changing_field/*/field", bypass_reason="test")]}, - ExpectedRecordsConfig(path="foobar"), + _DEFAULT_RECORD_CONFIG, {"constant_field": "must equal", "fast_changing_field": [{"field": 2}]}, {"test_stream": [{"constant_field": "must equal", "fast_changing_field": [{"field": 1}]}]}, + None, does_not_raise(), ), # Fail when expected and actual records are not equal and exact_order=True @@ -655,6 +664,7 @@ def test_configured_catalog_fixture(mocker, test_strictness_level, configured_ca ExpectedRecordsConfig(extra_fields=False, exact_order=True, extra_records=True, path="foobar"), {"constant_field": "must equal", "fast_changing_field": [{"field": 2}]}, {"test_stream": [{"constant_field": "must equal", "fast_changing_field": [{"field": 1}]}]}, + None, pytest.raises(AssertionError, match="Stream test_stream: Mismatch of record order or values"), ), # Expected and Actual records are not equal but we ignore fast changing field (for case when exact_order=True) @@ -664,15 +674,81 @@ def test_configured_catalog_fixture(mocker, test_strictness_level, configured_ca ExpectedRecordsConfig(extra_fields=False, exact_order=True, extra_records=True, path="foobar"), {"constant_field": "must equal", "fast_changing_field": [{"field": 1}]}, {"test_stream": [{"constant_field": "must equal", "fast_changing_field": [{"field": 2}]}]}, + None, + does_not_raise(), + ), + # Match by primary key + ( + {"type": "object"}, + {}, + _DEFAULT_RECORD_CONFIG, + {"primary_key": "a primary_key"}, + {"test_stream": [{"primary_key": "a primary_key"}]}, + [["primary_key"]], + does_not_raise(), + ), + # Match by primary key when actual has added fields + ( + {"type": "object"}, + {}, + _DEFAULT_RECORD_CONFIG, + {"primary_key": "a primary_key", "a field that should be ignored": "ignored value"}, + {"test_stream": [{"primary_key": "a primary_key"}]}, + [["primary_key"]], + does_not_raise(), + ), + # Match by primary key when non primary key field values differ + ( + {"type": "object"}, + {}, + _DEFAULT_RECORD_CONFIG, + {"primary_key": "a primary_key", "matching key": "value 1"}, + {"test_stream": [{"primary_key": "a primary_key", "non matching key": "value 2"}]}, + [["primary_key"]], + does_not_raise(), + ), + # Match nested primary key + ( + {"type": "object"}, + {}, + _DEFAULT_RECORD_CONFIG, + {"top_level_field": {"child_field": "a primary_key"}, "matching key": "value 1"}, + {"test_stream": [{"top_level_field": {"child_field": "a primary_key"}, "matching key": "value 1"}]}, + [["top_level_field", "child_field"]], + does_not_raise(), + ), + # Match composite primary key + ( + {"type": "object"}, + {}, + _DEFAULT_RECORD_CONFIG, + {"primary_key_1": "a primary_key_1", "primary_key_2": "a primary_key_2"}, + {"test_stream": [{"primary_key_1": "a primary_key_1", "primary_key_2": "a primary_key_2"}]}, + [["primary_key_1"], ["primary_key_2"]], + does_not_raise(), + ), + # Match composite and nested primary key + ( + {"type": "object"}, + {}, + _DEFAULT_RECORD_CONFIG, + {"primary_key_1": "a primary_key_1", "primary_key_2_1": {"primary_key_2_2": "primary_key_2"}}, + {"test_stream": [{"primary_key_1": "a primary_key_1", "primary_key_2_1": {"primary_key_2_2": "primary_key_2"}}]}, + [["primary_key_1"], ["primary_key_2_1", "primary_key_2_2"]], does_not_raise(), ), ], ) -async def test_read(mocker, schema, ignored_fields, expect_records_config, record, expected_records_by_stream, expectation): +async def test_read(mocker, schema, ignored_fields, expect_records_config, record, expected_records_by_stream, primary_key, expectation): configured_catalog = ConfiguredAirbyteCatalog( streams=[ ConfiguredAirbyteStream( - stream=AirbyteStream.parse_obj({"name": "test_stream", "json_schema": schema, "supported_sync_modes": ["full_refresh"]}), + stream=AirbyteStream.parse_obj({ + "name": "test_stream", + "json_schema": schema, + "supported_sync_modes": ["full_refresh"], + "source_defined_primary_key": primary_key + }), sync_mode="full_refresh", destination_sync_mode="overwrite", ) @@ -702,78 +778,6 @@ async def test_read(mocker, schema, ignored_fields, expect_records_config, recor ) -@pytest.mark.parametrize( - "config_fail_on_extra_columns, record_has_unexpected_column, expectation_should_fail", - [ - (True, True, True), - (True, False, False), - (False, False, False), - (False, True, False), - ], -) -@pytest.mark.parametrize("additional_properties", [True, False, None]) -async def test_fail_on_extra_columns( - mocker, config_fail_on_extra_columns, record_has_unexpected_column, expectation_should_fail, additional_properties -): - schema = {"type": "object", "properties": {"field_1": {"type": ["string"]}, "field_2": {"type": ["string"]}}} - if additional_properties: - schema["additionalProperties"] = additional_properties - - record = {"field_1": "value", "field_2": "value"} - if record_has_unexpected_column: - record["surprise_field"] = "value" - - configured_catalog = ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=AirbyteStream.parse_obj({"name": "test_stream", "json_schema": schema, "supported_sync_modes": ["full_refresh"]}), - sync_mode="full_refresh", - destination_sync_mode="overwrite", - ) - ] - ) - docker_runner_mock = mocker.MagicMock( - call_read=mocker.AsyncMock( - return_value=[AirbyteMessage(type=Type.RECORD, record=AirbyteRecordMessage(stream="test_stream", data=record, emitted_at=111))] - ) - ) - - t = test_core.TestBasicRead() - if expectation_should_fail: - with pytest.raises(Failed, match="test_stream"): - await t.test_read( - connector_config=None, - configured_catalog=configured_catalog, - expect_records_config=ExpectedRecordsConfig(path="foobar"), - should_validate_schema=True, - should_validate_data_points=False, - should_validate_stream_statuses=False, - should_fail_on_extra_columns=config_fail_on_extra_columns, - empty_streams=set(), - expected_records_by_stream={}, - docker_runner=docker_runner_mock, - ignored_fields=None, - detailed_logger=MagicMock(), - certified_file_based_connector=False, - ) - else: - t.test_read( - connector_config=None, - configured_catalog=configured_catalog, - expect_records_config=ExpectedRecordsConfig(path="foobar"), - should_validate_schema=True, - should_validate_data_points=False, - should_validate_stream_statuses=False, - should_fail_on_extra_columns=config_fail_on_extra_columns, - empty_streams=set(), - expected_records_by_stream={}, - docker_runner=docker_runner_mock, - ignored_fields=None, - detailed_logger=MagicMock(), - certified_file_based_connector=False, - ) - - @pytest.mark.parametrize( "output, expect_trace_message_on_failure, should_fail", [ @@ -1459,7 +1463,7 @@ async def test_read_validate_async_output_stream_statuses(mocker): await t.test_read( connector_config=None, configured_catalog=configured_catalog, - expect_records_config=ExpectedRecordsConfig(path="foobar"), + expect_records_config=_DEFAULT_RECORD_CONFIG, should_validate_schema=False, should_validate_data_points=False, should_validate_stream_statuses=True, @@ -1559,7 +1563,7 @@ async def test_read_validate_stream_statuses_exceptions(mocker, output): await t.test_read( connector_config=None, configured_catalog=configured_catalog, - expect_records_config=ExpectedRecordsConfig(path="foobar"), + expect_records_config=_DEFAULT_RECORD_CONFIG, should_validate_schema=False, should_validate_data_points=False, should_validate_stream_statuses=True,