Skip to content

Commit

Permalink
Live tests: GHA to run both validation & regression tests (#38816)
Browse files Browse the repository at this point in the history
Co-authored-by: Augustin <augustin@airbyte.io>
  • Loading branch information
clnoll and alafanechere authored Jun 20, 2024
1 parent af78176 commit c7ecc41
Show file tree
Hide file tree
Showing 13 changed files with 500 additions and 65 deletions.
105 changes: 105 additions & 0 deletions .github/workflows/live_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
name: Connector CI - Run Live Validation Tests

concurrency:
# This is the name of the concurrency group. It is used to prevent concurrent runs of the same workflow.
#
# - github.head_ref is only defined on PR runs, it makes sure that the concurrency group is unique for pull requests
# ensuring that only one run per pull request is active at a time.
#
# - github.run_id is defined on all runs, it makes sure that the concurrency group is unique for workflow dispatches.
# This allows us to run multiple workflow dispatches in parallel.
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

on:
workflow_dispatch:
inputs:
connector_name:
description: Connector name (e.g. source-faker)
required: true
connection_id:
description: ID of the connection to test; use "auto" to let the connection retriever choose a connection
required: true
pr_url:
description: URL of the PR containing the code change
required: true
streams:
description: Streams to include in tests
use_local_cdk:
description: Use the local CDK when building the target connector
default: "false"
type: boolean

jobs:
live_tests:
name: Live Tests
runs-on: connector-test-large
timeout-minutes: 360 # 6 hours
steps:
- name: Checkout Airbyte
uses: actions/checkout@v4
- name: Check PAT rate limits
run: |
./tools/bin/find_non_rate_limited_PAT \
${{ secrets.GH_PAT_BUILD_RUNNER_OSS }} \
${{ secrets.GH_PAT_BUILD_RUNNER_BACKUP }}
- name: Extract branch name [WORKFLOW DISPATCH]
shell: bash
if: github.event_name == 'workflow_dispatch'
run: echo "branch=${GITHUB_REF#refs/heads/}" >> $GITHUB_OUTPUT
id: extract_branch

- name: Install Poetry
id: install_poetry
uses: snok/install-poetry@v1

- name: Make poetry venv in project
id: poetry_venv
run: poetry config virtualenvs.in-project true

- name: Install Python packages
id: install_python_packages
working-directory: airbyte-ci/connectors/pipelines
run: poetry install

- name: Fetch last commit id from remote branch [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch'
id: fetch_last_commit_id_wd
run: echo "commit_id=$(git rev-parse origin/${{ steps.extract_branch.outputs.branch }})" >> $GITHUB_OUTPUT

- name: Setup Stream Parameters
if: github.event_name == 'workflow_dispatch'
run: |
if [ -z "${{ github.event.inputs.streams }}" ]; then
echo "STREAM_PARAMS=" >> $GITHUB_ENV
else
STREAMS=$(echo "${{ github.event.inputs.streams }}" | sed 's/,/ --connector_live_tests.selected-streams=/g')
echo "STREAM_PARAMS=--connector_live_tests.selected-streams=$STREAMS" >> $GITHUB_ENV
fi
- name: Setup Local CDK Flag
if: github.event_name == 'workflow_dispatch'
run: |
if ${{ github.event.inputs.use_local_cdk }}; then
echo "USE_LOCAL_CDK_FLAG=--use-local-cdk" >> $GITHUB_ENV
else
echo "USE_LOCAL_CDK_FLAG=" >> $GITHUB_ENV
fi
- name: Run Live Tests [WORKFLOW DISPATCH]
if: github.event_name == 'workflow_dispatch' # TODO: consider using the matrix strategy (https://docs.github.com/en/actions/using-jobs/using-a-matrix-for-your-jobs). See https://github.com/airbytehq/airbyte/pull/37659#discussion_r1583380234 for details.
uses: ./.github/actions/run-airbyte-ci
with:
context: "manual"
dagger_cloud_token: ${{ secrets.DAGGER_CLOUD_TOKEN_2 }}
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
gcp_integration_tester_credentials: ${{ secrets.GCLOUD_INTEGRATION_TESTER }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
git_branch: ${{ steps.extract_branch.outputs.branch }}
git_revision: ${{ steps.fetch_last_commit_id_pr.outputs.commit_id }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=all --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }} --connector_live_tests.test-evaluation-mode=diagnostic
6 changes: 3 additions & 3 deletions .github/workflows/regression_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ jobs:
if [ -z "${{ github.event.inputs.streams }}" ]; then
echo "STREAM_PARAMS=" >> $GITHUB_ENV
else
STREAMS=$(echo "${{ github.event.inputs.streams }}" | sed 's/,/ --connector_regression_tests.selected-streams=/g')
echo "STREAM_PARAMS=--connector_regression_tests.selected-streams=$STREAMS" >> $GITHUB_ENV
STREAMS=$(echo "${{ github.event.inputs.streams }}" | sed 's/,/ --connector_live_tests.selected-streams=/g')
echo "STREAM_PARAMS=--connector_live_tests.selected-streams=$STREAMS" >> $GITHUB_ENV
fi
- name: Setup Local CDK Flag
Expand Down Expand Up @@ -102,4 +102,4 @@ jobs:
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OSS }}
s3_build_cache_access_key_id: ${{ secrets.SELF_RUNNER_AWS_ACCESS_KEY_ID }}
s3_build_cache_secret_key: ${{ secrets.SELF_RUNNER_AWS_SECRET_ACCESS_KEY }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_regression_tests --connector_regression_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_regression_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }}
subcommand: connectors ${{ env.USE_LOCAL_CDK_FLAG }} --name ${{ github.event.inputs.connector_name }} test --only-step connector_live_tests --connector_live_tests.test-suite=regression --connector_live_tests.connection-id=${{ github.event.inputs.connection_id }} --connector_live_tests.pr-url=${{ github.event.inputs.pr_url }} ${{ env.STREAM_PARAMS }}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TestEvaluationMode(Enum):
tests only.
The diagnostic mode can be made available to a test using the @pytest.mark.allow_diagnostic_mode decorator,
and passing in the --validation-test-mode=diagnostic flag.
and passing in the --test-evaluation-mode=diagnostic flag.
"""

DIAGNOSTIC = "diagnostic"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
#


from functools import reduce
from enum import Enum
from functools import reduce, total_ordering
from typing import Any, Dict, List, Mapping, Optional, Set, Text, Union

import dpath.util
Expand Down Expand Up @@ -50,6 +51,22 @@ def parse(self, record: Mapping[str, Any], path: Optional[List[Union[int, str]]]
return self._parse_value(value)


@total_ordering
class ComparableType(Enum):
NULL = 0
BOOLEAN = 1
INTEGER = 2
NUMBER = 3
STRING = 4
OBJECT = 5

def __lt__(self, other: Any) -> bool:
if self.__class__ is other.__class__:
return self.value < other.value # type: ignore
else:
return NotImplemented


class JsonSchemaHelper:
"""Helper class to simplify schema validation and read of records according to their schema."""

Expand Down Expand Up @@ -263,3 +280,88 @@ def get_paths_in_connector_config(schema: dict) -> List[str]:
:returns list of path_in_connector_config paths
"""
return ["/" + "/".join(value["path_in_connector_config"]) for value in schema.values()]


def conforms_to_schema(record: Mapping[str, Any], schema: Mapping[str, Any]) -> bool:
"""
Return true iff the record conforms to the supplied schema.
The record conforms to the supplied schema iff:
- All columns in the record are in the schema.
- For every column in the record, that column's type is equal to or narrower than the same column's
type in the schema.
"""
schema_columns = set(schema.get("properties", {}).keys())
record_columns = set(record.keys())

if not record_columns.issubset(schema_columns):
return False

for column, definition in schema.get("properties", {}).items():
expected_type = definition.get("type")
value = record.get(column)

if value is not None:
if isinstance(expected_type, list):
return any(_is_equal_or_narrower_type(value, e) for e in expected_type)
elif expected_type == "object":
return isinstance(value, dict)
elif expected_type == "array":
if not isinstance(value, list):
return False
array_type = definition.get("items", {}).get("type")
if not all(_is_equal_or_narrower_type(v, array_type) for v in value):
return False
elif not _is_equal_or_narrower_type(value, expected_type):
return False

return True


def _is_equal_or_narrower_type(value: Any, expected_type: str) -> bool:
if isinstance(value, list):
# We do not compare lists directly; the individual items are compared.
# If we hit this condition, it means that the expected type is not
# compatible with the inferred type.
return False

inferred_type = ComparableType(_get_inferred_type(value))

if inferred_type is None:
return False

return ComparableType(inferred_type) <= ComparableType(_get_comparable_type(expected_type))


def _get_inferred_type(value: Any) -> Optional[ComparableType]:
if value is None:
return ComparableType.NULL
if isinstance(value, bool):
return ComparableType.BOOLEAN
if isinstance(value, int):
return ComparableType.INTEGER
if isinstance(value, float):
return ComparableType.NUMBER
if isinstance(value, str):
return ComparableType.STRING
if isinstance(value, dict):
return ComparableType.OBJECT
else:
return None


def _get_comparable_type(value: Any) -> Optional[ComparableType]:
if value == "null":
return ComparableType.NULL
if value == "boolean":
return ComparableType.BOOLEAN
if value == "integer":
return ComparableType.INTEGER
if value == "number":
return ComparableType.NUMBER
if value == "string":
return ComparableType.STRING
if value == "object":
return ComparableType.OBJECT
else:
return None
10 changes: 5 additions & 5 deletions airbyte-ci/connectors/live-tests/src/live_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
from pytest_sugar import SugarTerminalReporter # type: ignore

# CONSTS
LOGGER = logging.getLogger("regression")
MAIN_OUTPUT_DIRECTORY = Path("/tmp/regression_tests_artifacts")
LOGGER = logging.getLogger("live-tests")
MAIN_OUTPUT_DIRECTORY = Path("/tmp/live_tests_artifacts")

# It's used by Dagger and its very verbose
logging.getLogger("httpx").setLevel(logging.ERROR)
Expand All @@ -54,7 +54,7 @@
def pytest_addoption(parser: Parser) -> None:
parser.addoption(
"--connector-image",
help="The connector image name on which the regressions tests will run: e.g. airbyte/source-faker",
help="The connector image name on which the tests will run: e.g. airbyte/source-faker",
)
parser.addoption(
"--control-version",
Expand All @@ -63,7 +63,7 @@ def pytest_addoption(parser: Parser) -> None:
parser.addoption(
"--target-version",
default="dev",
help="The target version used for regression testing. Defaults to dev.",
help="The target version used for regression and validation testing. Defaults to dev.",
)
parser.addoption("--config-path")
parser.addoption("--catalog-path")
Expand Down Expand Up @@ -140,7 +140,7 @@ def pytest_configure(config: Config) -> None:
else:
config.stash[stash_keys.SHOULD_READ_WITH_STATE] = prompt_for_read_with_or_without_state()

retrieval_reason = f"Running regression tests on connection for connector {config.stash[stash_keys.CONNECTOR_IMAGE]} on target versions ({config.stash[stash_keys.TARGET_VERSION]})."
retrieval_reason = f"Running live tests on connection for connector {config.stash[stash_keys.CONNECTOR_IMAGE]} on target versions ({config.stash[stash_keys.TARGET_VERSION]})."

try:
config.stash[stash_keys.CONNECTION_OBJECTS] = get_connection_objects(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from typing import TYPE_CHECKING, Any, Callable, List, Mapping, Optional, Tuple

import pytest
from airbyte_cdk.sources.file_based.schema_helpers import conforms_to_schema
from airbyte_protocol.models import (
AirbyteStateMessage,
AirbyteStateStats,
Expand All @@ -16,6 +15,7 @@
AirbyteStreamStatusTraceMessage,
ConfiguredAirbyteCatalog,
)
from live_tests.commons.json_schema_helper import conforms_to_schema
from live_tests.commons.models import ExecutionResult
from live_tests.utils import fail_test_on_failing_execution_results, get_test_logger

Expand Down
Loading

0 comments on commit c7ecc41

Please sign in to comment.