Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #35110] match CATs records only one primary key when primary k… #35556

Merged
merged 8 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 3.6.0
Relaxing CATs validation when a stream has a primary key defined.

## 3.5.0
Add `validate_stream_statuses` to TestBasicRead.test_read:: Validate all statuses for all streams in the catalogs were emitted in correct order.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from os.path import splitext
from pathlib import Path
from threading import Thread
from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Set, Tuple
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple
from xmlrpc.client import Boolean

import connector_acceptance_test.utils.docs as docs_utils
Expand All @@ -21,6 +21,7 @@
import pytest
import requests
from airbyte_protocol.models import (
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
AirbyteStreamStatus,
Expand Down Expand Up @@ -838,14 +839,23 @@ def primary_keys_for_records(streams, records):
for stream in streams_with_primary_key:
stream_records = [r for r in records if r.stream == stream.stream.name]
for stream_record in stream_records:
pk_values = {}
for pk_path in stream.stream.source_defined_primary_key:
pk_value = reduce(lambda data, key: data.get(key) if isinstance(data, dict) else None, pk_path, stream_record.data)
pk_values[tuple(pk_path)] = pk_value

pk_values = _extract_primary_key(stream_record.data, stream.stream.source_defined_primary_key)
yield pk_values, stream_record


def _extract_primary_keys(records: Iterable[Mapping[str, Any]], primary_key: List[str]) -> Iterable[Tuple[Any, Mapping[str, Any]]]:
for record in records:
yield _extract_primary_key(record, primary_key)


def _extract_primary_key(record: Mapping[str, Any], primary_key: List[str]):
pk_values = {}
for pk_path in primary_key:
pk_value = reduce(lambda data, key: record.get(key) if isinstance(data, dict) else None, pk_path, record) # type: ignore # we assume that the path define by the primary key exists
pk_values[tuple(pk_path)] = pk_value
return pk_values


@pytest.mark.default_timeout(TEN_MINUTES)
class TestBasicRead(BaseTest):
@staticmethod
Expand Down Expand Up @@ -953,6 +963,7 @@ def _validate_expected_records(
flags,
ignored_fields: Optional[Mapping[str, List[IgnoredFieldsConfiguration]]],
detailed_logger: Logger,
configured_catalog: ConfiguredAirbyteCatalog,
):
"""
We expect some records from stream to match expected_records, partially or fully, in exact or any order.
Expand All @@ -972,6 +983,7 @@ def _validate_expected_records(
extra_records=flags.extra_records,
ignored_fields=ignored_field_names,
detailed_logger=detailed_logger,
configured_catalog=configured_catalog,
)

@pytest.fixture(name="should_validate_schema")
Expand Down Expand Up @@ -1081,6 +1093,7 @@ async def test_read(
flags=expect_records_config,
ignored_fields=ignored_fields,
detailed_logger=detailed_logger,
configured_catalog=configured_catalog,
)

if should_validate_stream_statuses:
Expand Down Expand Up @@ -1142,8 +1155,45 @@ def compare_records(
extra_records: bool,
ignored_fields: List[str],
detailed_logger: Logger,
configured_catalog: ConfiguredAirbyteCatalog,
):
"""Compare records using combination of restrictions"""
configured_streams = [stream for stream in configured_catalog.streams if stream.stream.name == stream_name]
if len(configured_streams) != 1:
raise ValueError(f"Expected exactly one stream matching name {stream_name} but got {len(configured_streams)}")

configured_stream = configured_streams[0]
if configured_stream.stream.source_defined_primary_key:
# as part of the migration for relaxing CATs, we are starting only with the streams that defines primary keys
expected_primary_keys = list(_extract_primary_keys(expected, configured_stream.stream.source_defined_primary_key))
actual_primary_keys = list(_extract_primary_keys(actual, configured_stream.stream.source_defined_primary_key))
if exact_order:
assert (
actual_primary_keys[: len(expected_primary_keys)] == expected_primary_keys
), f"Expected to see those primary keys in order in the actual response for stream {stream_name}."
else:
expected_but_not_found = set(map(make_hashable, expected_primary_keys)).difference(
set(map(make_hashable, actual_primary_keys))
)
assert (
not expected_but_not_found
), f"Expected to see those primary keys in the actual response for stream {stream_name} but they were not found."
else:
TestBasicRead.legacy_compare_records(
stream_name, actual, expected, extra_fields, exact_order, extra_records, ignored_fields, detailed_logger
)

@staticmethod
def legacy_compare_records(
stream_name: str,
actual: List[Mapping[str, Any]],
expected: List[Mapping[str, Any]],
extra_fields: bool,
exact_order: bool,
extra_records: bool,
ignored_fields: List[str],
detailed_logger: Logger,
):
if exact_order:
if ignored_fields:
for item in actual:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ def verify_records_schema(
stream_validators = {}
for stream in catalog.streams:
schema_to_validate_against = stream.stream.json_schema
validator = NoAdditionalPropertiesValidator if fail_on_extra_columns else Draft7ValidatorWithStrictInteger
# We will be disabling strict `NoAdditionalPropertiesValidator` until we have a better plan for schema validation. The consequence
# is that we will lack visibility on new fields that are not added on the root level (root level is validated by Datadog)
# validator = NoAdditionalPropertiesValidator if fail_on_extra_columns else Draft7ValidatorWithStrictInteger
validator = Draft7ValidatorWithStrictInteger
stream_validators[stream.stream.name] = validator(schema_to_validate_against, format_checker=CustomFormatChecker())
stream_errors = defaultdict(dict)
for record in records:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "connector-acceptance-test"
version = "3.5.0"
version = "3.6.0"
description = "Contains acceptance tests for connectors."
authors = ["Airbyte <contact@airbyte.io>"]
license = "MIT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,57 +87,6 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):
]


@pytest.mark.parametrize(
"json_schema, record, should_fail",
[
({"type": "object", "properties": {"a": {"type": "string"}}}, {"a": "str", "b": "extra_string"}, True),
(
{"type": "object", "properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"]}}},
{"a": "str", "some_obj": {"b": "extra_string"}},
False,
),
(
{
"type": "object",
"properties": {"a": {"type": "string"}, "some_obj": {"type": ["null", "object"], "properties": {"a": {"type": "string"}}}},
},
{"a": "str", "some_obj": {"a": "str", "b": "extra_string"}},
True,
),
(
{"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "array", "items": {"type": "object"}}}},
{"a": "str", "b": [{"a": "extra_string"}]},
False,
),
(
{
"type": "object",
"properties": {
"a": {"type": "string"},
"b": {"type": "array", "items": {"type": "object", "properties": {"a": {"type": "string"}}}},
},
},
{"a": "str", "b": [{"a": "string", "b": "extra_string"}]},
True,
),
],
ids=[
"simple_schema_and_record_with_extra_property",
"schema_with_object_without_properties_and_record_with_object_with_property",
"schema_with_object_with_properties_and_record_with_object_with_extra_property",
"schema_with_array_of_objects_without_properties_and_record_with_array_of_objects_with_property",
"schema_with_array_of_objects_with_properties_and_record_with_array_of_objects_with_extra_property",
],
)
def test_verify_records_schema_with_fail_on_extra_columns(configured_catalog: ConfiguredAirbyteCatalog, json_schema, record, should_fail):
"""Test that fail_on_extra_columns works correctly with nested objects, array of objects"""
configured_catalog.streams[0].stream.json_schema = json_schema
records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0)]
streams_with_errors = verify_records_schema(records, configured_catalog, fail_on_extra_columns=True)
errors = [error.message for error in streams_with_errors["my_stream"].values()]
assert errors if should_fail else not errors


@pytest.mark.parametrize(
"record, configured_catalog, valid",
[
Expand Down
Loading
Loading