diff --git a/airbyte-ci/connectors/pipelines/README.md b/airbyte-ci/connectors/pipelines/README.md index 1a48d92149dc..299a347192b2 100644 --- a/airbyte-ci/connectors/pipelines/README.md +++ b/airbyte-ci/connectors/pipelines/README.md @@ -377,7 +377,8 @@ This command runs the Python tests for a airbyte-ci poetry package. | Version | PR | Description | |---------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------| -| 0.2.1 | [#28897](https://github.com/airbytehq/airbyte/pull/28897) | Sentry: Ignore error logs without exceptions from reporting | +| 0.2.2 | [#28897](https://github.com/airbytehq/airbyte/pull/28897) | Sentry: Ignore error logs without exceptions from reporting | +| 0.2.1 | [#28767](https://github.com/airbytehq/airbyte/pull/28767) | Improve pytest step result evaluation to prevent false negative/positive. | | 0.2.0 | [#28857](https://github.com/airbytehq/airbyte/pull/28857) | Add the `airbyte-ci tests` command to run the test suite on any `airbyte-ci` poetry package. | | 0.1.1 | [#28858](https://github.com/airbytehq/airbyte/pull/28858) | Increase the max duration of Connector Package install to 20mn. | | 0.1.0 | | Alpha version not in production yet. All the commands described in this doc are available. | diff --git a/airbyte-ci/connectors/pipelines/pipelines/actions/environments.py b/airbyte-ci/connectors/pipelines/pipelines/actions/environments.py index 77cb079d0fdf..b5b48f582f1a 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/actions/environments.py +++ b/airbyte-ci/connectors/pipelines/pipelines/actions/environments.py @@ -10,12 +10,10 @@ import json import re import uuid -from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Callable, List, Optional import toml -import yaml from dagger import CacheVolume, Client, Container, DaggerError, Directory, File, Platform, Secret from dagger.engine._version import CLI_VERSION as dagger_engine_version from pipelines import consts @@ -502,42 +500,6 @@ def with_docker_cli(context: ConnectorContext) -> Container: return with_bound_docker_host(context, docker_cli) -async def with_connector_acceptance_test(context: ConnectorContext, connector_under_test_image_tar: File) -> Container: - """Create a container to run connector acceptance tests, bound to a persistent docker host. - - Args: - context (ConnectorContext): The current connector context. - connector_under_test_image_tar (File): The file containing the tar archive the image of the connector under test. - Returns: - Container: A container with connector acceptance tests installed. - """ - test_input = await context.get_connector_dir() - cat_config = yaml.safe_load(await test_input.file("acceptance-test-config.yml").contents()) - - image_sha = await load_image_to_docker_host(context, connector_under_test_image_tar, cat_config["connector_image"]) - - if context.connector_acceptance_test_image.endswith(":dev"): - cat_container = context.connector_acceptance_test_source_dir.docker_build() - else: - cat_container = context.dagger_client.container().from_(context.connector_acceptance_test_image) - - return ( - with_bound_docker_host(context, cat_container) - .with_entrypoint([]) - .with_exec(["pip", "install", "pytest-custom_exit_code"]) - .with_mounted_directory("/test_input", test_input) - .with_env_variable("CONNECTOR_IMAGE_ID", image_sha) - # This bursts the CAT cached results everyday. - # It's cool because in case of a partially failing nightly build the connectors that already ran CAT won't re-run CAT. - # We keep the guarantee that a CAT runs everyday. - .with_env_variable("CACHEBUSTER", datetime.utcnow().strftime("%Y%m%d")) - .with_workdir("/test_input") - .with_entrypoint(["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "--suppress-tests-failed-exit-code"]) - .with_(mounted_connector_secrets(context, "/test_input/secrets")) - .with_exec(["--acceptance-test-config", "/test_input"]) - ) - - def with_gradle( context: ConnectorContext, sources_to_include: List[str] = None, diff --git a/airbyte-ci/connectors/pipelines/pipelines/bases.py b/airbyte-ci/connectors/pipelines/pipelines/bases.py index 124c9d813cc4..aa9220eebba7 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/bases.py +++ b/airbyte-ci/connectors/pipelines/pipelines/bases.py @@ -18,14 +18,13 @@ import anyio import asyncer from anyio import Path -from pipelines import sentry_utils - from connector_ops.utils import console from dagger import Container, DaggerError, QueryError from jinja2 import Environment, PackageLoader, select_autoescape +from pipelines import sentry_utils from pipelines.actions import remote_storage from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH -from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify +from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result from rich.console import Group from rich.panel import Panel from rich.style import Style @@ -56,26 +55,6 @@ class StepStatus(Enum): FAILURE = "Failed" SKIPPED = "Skipped" - def from_exit_code(exit_code: int) -> StepStatus: - """Map an exit code to a step status. - - Args: - exit_code (int): A process exit code. - - Raises: - ValueError: Raised if the exit code is not mapped to a step status. - - Returns: - StepStatus: The step status inferred from the exit code. - """ - if exit_code == 0: - return StepStatus.SUCCESS - # pytest returns a 5 exit code when no test is found. - elif exit_code == 5: - return StepStatus.SKIPPED - else: - return StepStatus.FAILURE - def get_rich_style(self) -> Style: """Match color used in the console output to the step status.""" if self is StepStatus.SUCCESS: @@ -104,6 +83,8 @@ class Step(ABC): title: ClassVar[str] max_retries: ClassVar[int] = 0 should_log: ClassVar[bool] = True + success_exit_code: ClassVar[int] = 0 + skipped_exit_code: ClassVar[int] = None # The max duration of a step run. If the step run for more than this duration it will be considered as timed out. # The default of 5 hours is arbitrary and can be changed if needed. max_duration: ClassVar[timedelta] = timedelta(hours=5) @@ -124,19 +105,23 @@ def run_duration(self) -> timedelta: @property def logger(self) -> logging.Logger: if self.should_log: - return self.context.logger + return logging.getLogger(f"{self.context.pipeline_name} - {self.title}") else: disabled_logger = logging.getLogger() disabled_logger.disabled = True return disabled_logger + @property + def dagger_client(self) -> Container: + return self.context.dagger_client.pipeline(self.title) + async def log_progress(self, completion_event: anyio.Event) -> None: """Log the step progress every 30 seconds until the step is done.""" while not completion_event.is_set(): duration = datetime.utcnow() - self.started_at elapsed_seconds = duration.total_seconds() if elapsed_seconds > 30 and round(elapsed_seconds) % 30 == 0: - self.logger.info(f"⏳ Still running {self.title}... (duration: {format_duration(duration)})") + self.logger.info(f"⏳ Still running... (duration: {format_duration(duration)})") await anyio.sleep(1) async def run_with_completion(self, completion_event: anyio.Event, *args, **kwargs) -> StepResult: @@ -174,7 +159,7 @@ async def run(self, *args, **kwargs) -> StepResult: if result.status is StepStatus.FAILURE and self.retry_count <= self.max_retries and self.max_retries > 0: self.retry_count += 1 await anyio.sleep(10) - self.logger.warn(f"Retry #{self.retry_count} for {self.title} step on connector {self.context.connector.technical_name}.") + self.logger.warn(f"Retry #{self.retry_count}.") return await self.run(*args, **kwargs) self.stopped_at = datetime.utcnow() self.log_step_result(result) @@ -192,11 +177,11 @@ def log_step_result(self, result: StepResult) -> None: """ duration = format_duration(self.run_duration) if result.status is StepStatus.FAILURE: - self.logger.error(f"{result.status.get_emoji()} {self.title} failed (duration: {duration})") + self.logger.error(f"{result.status.get_emoji()} failed (duration: {duration})") if result.status is StepStatus.SKIPPED: - self.logger.info(f"{result.status.get_emoji()} {self.title} was skipped (duration: {duration})") + self.logger.info(f"{result.status.get_emoji()} was skipped (duration: {duration})") if result.status is StepStatus.SUCCESS: - self.logger.info(f"{result.status.get_emoji()} {self.title} was successful (duration: {duration})") + self.logger.info(f"{result.status.get_emoji()} was successful (duration: {duration})") @abstractmethod async def _run(self, *args, **kwargs) -> StepResult: @@ -218,6 +203,28 @@ def skip(self, reason: str = None) -> StepResult: """ return StepResult(self, StepStatus.SKIPPED, stdout=reason) + def get_step_status_from_exit_code( + self, + exit_code: int, + ) -> StepStatus: + """Map an exit code to a step status. + + Args: + exit_code (int): A process exit code. + + Raises: + ValueError: Raised if the exit code is not mapped to a step status. + + Returns: + StepStatus: The step status inferred from the exit code. + """ + if exit_code == self.success_exit_code: + return StepStatus.SUCCESS + elif self.skipped_exit_code is not None and exit_code == self.skipped_exit_code: + return StepStatus.SKIPPED + else: + return StepStatus.FAILURE + async def get_step_result(self, container: Container) -> StepResult: """Concurrent retrieval of exit code, stdout and stdout of a container. @@ -232,7 +239,7 @@ async def get_step_result(self, container: Container) -> StepResult: exit_code, stdout, stderr = await get_exec_result(container) return StepResult( self, - StepStatus.from_exit_code(exit_code), + self.get_step_status_from_exit_code(exit_code), stderr=stderr, stdout=stdout, output_artifact=container, @@ -249,31 +256,7 @@ def _get_timed_out_step_result(self) -> StepResult: class PytestStep(Step, ABC): """An abstract class to run pytest tests and evaluate success or failure according to pytest logs.""" - async def write_log_file(self, logs) -> str: - """Return the path to the pytest log file.""" - log_directory = Path(f"{self.context.connector.code_directory}/airbyte_ci_logs") - await log_directory.mkdir(exist_ok=True) - log_path = await (log_directory / f"{slugify(self.title).replace('-', '_')}.log").resolve() - await log_path.write_text(logs) - self.logger.info(f"Pytest logs written to {log_path}") - - # TODO this is not very robust if pytest crashes and does not outputs its expected last log line. - def pytest_logs_to_step_result(self, logs: str) -> StepResult: - """Parse pytest log and infer failure, success or skipping. - - Args: - logs (str): The pytest logs. - - Returns: - StepResult: The inferred step result according to the log. - """ - last_log_line = logs.split("\n")[-2] - if "failed" in last_log_line or "errors" in last_log_line: - return StepResult(self, StepStatus.FAILURE, stderr=logs) - elif "no tests ran" in last_log_line: - return StepResult(self, StepStatus.SKIPPED, stdout=logs) - else: - return StepResult(self, StepStatus.SUCCESS, stdout=logs) + skipped_exit_code = 5 async def _run_tests_in_directory(self, connector_under_test: Container, test_directory: str) -> StepResult: """Run the pytest tests in the test_directory that was passed. @@ -294,18 +277,13 @@ async def _run_tests_in_directory(self, connector_under_test: Container, test_di "python", "-m", "pytest", - "--suppress-tests-failed-exit-code", - "--suppress-no-test-exit-code", "-s", test_directory, "-c", test_config, ] ) - logs = await tester.stdout() - if self.context.is_local: - await self.write_log_file(logs) - return self.pytest_logs_to_step_result(logs) + return await self.get_step_result(tester) else: return StepResult(self, StepStatus.SKIPPED) diff --git a/airbyte-ci/connectors/pipelines/pipelines/format/java_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/format/java_connectors.py index 38feae7eeeb8..7d73f3ab40fb 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/format/java_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/format/java_connectors.py @@ -4,7 +4,7 @@ from pipelines.actions import environments -from pipelines.bases import StepResult, StepStatus +from pipelines.bases import StepResult from pipelines.gradle import GradleTask from pipelines.utils import get_exec_result @@ -25,7 +25,7 @@ async def _run(self) -> StepResult: exit_code, stdout, stderr = await get_exec_result(formatted) return StepResult( self, - StepStatus.from_exit_code(exit_code), + self.get_step_status_from_exit_code(exit_code), stderr=stderr, stdout=stdout, output_artifact=formatted.directory(str(self.context.connector.code_directory)), diff --git a/airbyte-ci/connectors/pipelines/pipelines/format/python_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/format/python_connectors.py index 9e033d93f743..e2ebbcf68d8a 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/format/python_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/format/python_connectors.py @@ -4,7 +4,7 @@ import asyncer from pipelines.actions import environments -from pipelines.bases import Step, StepResult, StepStatus +from pipelines.bases import Step, StepResult from pipelines.utils import with_exit_code, with_stderr, with_stdout @@ -50,7 +50,7 @@ async def _run(self) -> StepResult: return StepResult( self, - StepStatus.from_exit_code(soon_exit_code.value), + self.get_step_status_from_exit_code(await soon_exit_code), stderr=soon_stderr.value, stdout=soon_stdout.value, output_artifact=formatted.directory("/connector_code"), diff --git a/airbyte-ci/connectors/pipelines/pipelines/hacks.py b/airbyte-ci/connectors/pipelines/pipelines/hacks.py index 5f2d271249b6..0557786d5266 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/hacks.py +++ b/airbyte-ci/connectors/pipelines/pipelines/hacks.py @@ -7,7 +7,7 @@ from __future__ import annotations from copy import deepcopy -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Callable, List import requests import yaml @@ -15,8 +15,8 @@ from dagger import DaggerError if TYPE_CHECKING: + from dagger import Client, Container, Directory from pipelines.contexts import ConnectorContext - from dagger import Client, Directory LINES_TO_REMOVE_FROM_GRADLE_FILE = [ @@ -140,3 +140,28 @@ async def cache_latest_cdk(dagger_client: Client, pip_cache_volume_name: str = " .with_exec(["pip", "install", "--force-reinstall", f"airbyte-cdk=={cdk_latest_version}"]) .sync() ) + + +def never_fail_exec(command: List[str]) -> Callable: + """ + Wrap a command execution with some bash sugar to always exit with a 0 exit code but write the actual exit code to a file. + + Underlying issue: + When a classic dagger with_exec is returning a >0 exit code an ExecError is raised. + It's OK for the majority of our container interaction. + But some execution, like running CAT, are expected to often fail. + In CAT we don't want ExecError to be raised on container interaction because CAT might write updated secrets that we need to pull from the container after the test run. + The bash trick below is a hack to always return a 0 exit code but write the actual exit code to a file. + The file is then read by the pipeline to determine the exit code of the container. + + Args: + command (List[str]): The command to run in the container. + + Returns: + Callable: _description_ + """ + + def never_fail_exec_inner(container: Container): + return container.with_exec(["sh", "-c", f"{' '.join(command)}; echo $? > /exit_code"]) + + return never_fail_exec_inner diff --git a/airbyte-ci/connectors/pipelines/pipelines/tests/common.py b/airbyte-ci/connectors/pipelines/pipelines/tests/common.py index 1da5c1593fb8..bdcfddf26435 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/tests/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/tests/common.py @@ -4,19 +4,20 @@ """This module groups steps made to run tests agnostic to a connector language.""" +import datetime from abc import ABC, abstractmethod from functools import cached_property -from typing import ClassVar, Optional +from typing import ClassVar, List, Optional -import asyncer import requests import semver import yaml +from connector_ops.utils import Connector +from dagger import Container, File +from pipelines import hacks from pipelines.actions import environments from pipelines.bases import CIContext, PytestStep, Step, StepResult, StepStatus from pipelines.utils import METADATA_FILE_NAME -from connector_ops.utils import Connector -from dagger import File class VersionCheck(Step, ABC): @@ -176,8 +177,22 @@ class AcceptanceTests(PytestStep): """A step to run acceptance tests for a connector if it has an acceptance test config file.""" title = "Acceptance tests" + CONTAINER_TEST_INPUT_DIRECTORY = "/test_input" + CONTAINER_SECRETS_DIRECTORY = "/test_input/secrets" + + @property + def cat_command(self) -> List[str]: + return [ + "python", + "-m", + "pytest", + "-p", + "connector_acceptance_test.plugin", + "--acceptance-test-config", + self.CONTAINER_TEST_INPUT_DIRECTORY, + ] - async def _run(self, connector_under_test_image_tar: Optional[File]) -> StepResult: + async def _run(self, connector_under_test_image_tar: File) -> StepResult: """Run the acceptance test suite on a connector dev image. Build the connector acceptance test image if the tag is :dev. Args: @@ -189,19 +204,57 @@ async def _run(self, connector_under_test_image_tar: Optional[File]) -> StepResu if not self.context.connector.acceptance_test_config: return StepResult(self, StepStatus.SKIPPED) - cat_container = await environments.with_connector_acceptance_test(self.context, connector_under_test_image_tar) - secret_dir = cat_container.directory("/test_input/secrets") + cat_container = await self._build_connector_acceptance_test(connector_under_test_image_tar) + cat_container = cat_container.with_(hacks.never_fail_exec(self.cat_command)) - async with asyncer.create_task_group() as task_group: - soon_secret_files = task_group.soonify(secret_dir.entries)() - soon_cat_container_stdout = task_group.soonify(cat_container.stdout)() + step_result = await self.get_step_result(cat_container) + secret_dir = cat_container.directory(self.CONTAINER_SECRETS_DIRECTORY) - if secret_files := soon_secret_files.value: + if secret_files := await secret_dir.entries(): for file_path in secret_files: if file_path.startswith("updated_configurations"): self.context.updated_secrets_dir = secret_dir break - logs = soon_cat_container_stdout.value - if self.context.is_local: - await self.write_log_file(logs) - return self.pytest_logs_to_step_result(logs) + return step_result + + def get_cache_buster(self) -> str: + """ + This bursts the CAT cached results everyday. + It's cool because in case of a partially failing nightly build the connectors that already ran CAT won't re-run CAT. + We keep the guarantee that a CAT runs everyday. + + Returns: + str: A string representing the current date. + """ + return datetime.datetime.utcnow().strftime("%Y%m%d") + + async def _build_connector_acceptance_test(self, connector_under_test_image_tar: File) -> Container: + """Create a container to run connector acceptance tests, bound to a persistent docker host. + + Args: + connector_under_test_image_tar (File): The file containing the tar archive the image of the connector under test. + Returns: + Container: A container with connector acceptance tests installed. + """ + test_input = await self.context.get_connector_dir() + cat_config = yaml.safe_load(await test_input.file("acceptance-test-config.yml").contents()) + + image_sha = await environments.load_image_to_docker_host( + self.context, connector_under_test_image_tar, cat_config["connector_image"] + ) + + if self.context.connector_acceptance_test_image.endswith(":dev"): + cat_container = self.context.connector_acceptance_test_source_dir.docker_build() + else: + cat_container = self.dagger_client.container().from_(self.context.connector_acceptance_test_image) + + return ( + environments.with_bound_docker_host(self.context, cat_container) + .with_entrypoint([]) + .with_mounted_directory(self.CONTAINER_TEST_INPUT_DIRECTORY, test_input) + .with_env_variable("CONNECTOR_IMAGE_ID", image_sha) + .with_env_variable("CACHEBUSTER", self.get_cache_buster()) + .with_workdir(self.CONTAINER_TEST_INPUT_DIRECTORY) + .with_exec(["mkdir", "-p", self.CONTAINER_SECRETS_DIRECTORY]) + .with_(environments.mounted_connector_secrets(self.context, self.CONTAINER_SECRETS_DIRECTORY)) + ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/utils.py b/airbyte-ci/connectors/pipelines/pipelines/utils.py index ba397fb9e4f0..61f7329503a7 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/utils.py +++ b/airbyte-ci/connectors/pipelines/pipelines/utils.py @@ -12,27 +12,26 @@ import re import sys import unicodedata - from glob import glob +from io import TextIOWrapper from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, List, Optional, Set, Tuple, Union -from io import TextIOWrapper import anyio import asyncer import click import git -from pipelines import consts, main_logger, sentry_utils -from pipelines.consts import GCS_PUBLIC_DOMAIN from connector_ops.utils import get_all_released_connectors, get_changed_connectors from dagger import Client, Config, Connection, Container, DaggerError, ExecError, File, ImageLayerCompression, QueryError, Secret from google.cloud import storage from google.oauth2 import service_account from more_itertools import chunked +from pipelines import consts, main_logger, sentry_utils +from pipelines.consts import GCS_PUBLIC_DOMAIN if TYPE_CHECKING: - from pipelines.contexts import ConnectorContext from github import PullRequest + from pipelines.contexts import ConnectorContext DAGGER_CONFIG = Config(log_output=sys.stderr) AIRBYTE_REPO_URL = "https://github.com/airbytehq/airbyte.git" @@ -152,6 +151,9 @@ async def get_exec_result(container: Container) -> Tuple[int, str, str]: ExecError to handle errors. This is offered as a convenience when the exit code value is actually needed. + If the container has a file at /exit_code, the exit code will be read from it. + See hacks.never_fail_exec for more details. + Args: container (Container): The container to execute. @@ -159,7 +161,11 @@ async def get_exec_result(container: Container) -> Tuple[int, str, str]: Tuple[int, str, str]: The exit_code, stdout and stderr of the container, respectively. """ try: - return 0, *(await get_container_output(container)) + exit_code = 0 + in_file_exit_code = await get_file_contents(container, "/exit_code") + if in_file_exit_code: + exit_code = int(in_file_exit_code) + return exit_code, *(await get_container_output(container)) except ExecError as e: return e.exit_code, e.stdout, e.stderr diff --git a/airbyte-ci/connectors/pipelines/pyproject.toml b/airbyte-ci/connectors/pipelines/pyproject.toml index 6ca7626be6e7..0b8c327260c1 100644 --- a/airbyte-ci/connectors/pipelines/pyproject.toml +++ b/airbyte-ci/connectors/pipelines/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipelines" -version = "0.2.1" +version = "0.2.2" description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines" authors = ["Airbyte "] diff --git a/airbyte-ci/connectors/pipelines/tests/tests/__init__.py b/airbyte-ci/connectors/pipelines/tests/tests/__init__.py new file mode 100644 index 000000000000..c941b3045795 --- /dev/null +++ b/airbyte-ci/connectors/pipelines/tests/tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-ci/connectors/pipelines/tests/tests/test_common.py b/airbyte-ci/connectors/pipelines/tests/tests/test_common.py new file mode 100644 index 000000000000..b46733f4419c --- /dev/null +++ b/airbyte-ci/connectors/pipelines/tests/tests/test_common.py @@ -0,0 +1,199 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# +import pathlib +import time +from typing import List +from unittest.mock import MagicMock + +import dagger +import pytest +import yaml +from pipelines.bases import StepStatus +from pipelines.tests import common + +pytestmark = [ + pytest.mark.anyio, +] + + +class TestAcceptanceTests: + @staticmethod + def get_dummy_cat_container(dagger_client: dagger.Client, exit_code: int, secret_file_paths: List, stdout: str, stderr: str): + secret_file_paths = secret_file_paths or [] + container = ( + dagger_client.container() + .from_("bash:latest") + .with_exec(["mkdir", "-p", common.AcceptanceTests.CONTAINER_TEST_INPUT_DIRECTORY]) + .with_exec(["mkdir", "-p", common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY]) + ) + + for secret_file_path in secret_file_paths: + secret_dir_name = str(pathlib.Path(secret_file_path).parent) + container = container.with_exec(["mkdir", "-p", secret_dir_name]) + container = container.with_exec(["sh", "-c", f"echo foo > {secret_file_path}"]) + return container.with_new_file("/stupid_bash_script.sh", f"echo {stdout}; echo {stderr} >&2; exit {exit_code}") + + @pytest.fixture + def test_context(self, dagger_client): + return MagicMock(connector=MagicMock(), dagger_client=dagger_client) + + async def test_skipped_when_no_acceptance_test_config(self, test_context): + test_context.connector.acceptance_test_config = None + acceptance_test_step = common.AcceptanceTests(test_context) + step_result = await acceptance_test_step._run(None) + assert step_result.status == StepStatus.SKIPPED + + @pytest.mark.parametrize( + "exit_code,expected_status,secrets_file_names,expect_updated_secrets", + [ + (0, StepStatus.SUCCESS, [], False), + (1, StepStatus.FAILURE, [], False), + (2, StepStatus.FAILURE, [], False), + (common.AcceptanceTests.skipped_exit_code, StepStatus.SKIPPED, [], False), + (0, StepStatus.SUCCESS, [f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/config.json"], False), + (1, StepStatus.FAILURE, [f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/config.json"], False), + (2, StepStatus.FAILURE, [f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/config.json"], False), + ( + common.AcceptanceTests.skipped_exit_code, + StepStatus.SKIPPED, + [f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/config.json"], + False, + ), + ( + 0, + StepStatus.SUCCESS, + [ + f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/config.json", + f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/updated_configurations/updated_config.json", + ], + True, + ), + ( + 1, + StepStatus.FAILURE, + [ + f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/config.json", + f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/updated_configurations/updated_config.json", + ], + True, + ), + ( + 2, + StepStatus.FAILURE, + [ + f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/config.json", + f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/updated_configurations/updated_config.json", + ], + True, + ), + ( + common.AcceptanceTests.skipped_exit_code, + StepStatus.SKIPPED, + [ + f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/config.json", + f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}/updated_configurations/updated_config.json", + ], + True, + ), + ], + ) + async def test__run( + self, test_context, mocker, exit_code: int, expected_status: StepStatus, secrets_file_names: List, expect_updated_secrets: bool + ): + """Test the behavior of the run function using a dummy container.""" + cat_container = self.get_dummy_cat_container( + test_context.dagger_client, exit_code, secrets_file_names, stdout="hello", stderr="world" + ) + async_mock = mocker.AsyncMock(return_value=cat_container) + mocker.patch.object(common.AcceptanceTests, "_build_connector_acceptance_test", side_effect=async_mock) + mocker.patch.object(common.AcceptanceTests, "cat_command", ["bash", "/stupid_bash_script.sh"]) + acceptance_test_step = common.AcceptanceTests(test_context) + step_result = await acceptance_test_step._run(None) + assert step_result.status == expected_status + assert step_result.stdout.strip() == "hello" + assert step_result.stderr.strip() == "world" + if expect_updated_secrets: + assert ( + await test_context.updated_secrets_dir.entries() + == await cat_container.directory(f"{common.AcceptanceTests.CONTAINER_SECRETS_DIRECTORY}").entries() + ) + assert any("updated_configurations" in str(file_name) for file_name in await test_context.updated_secrets_dir.entries()) + + @pytest.fixture + def test_input_dir(self, dagger_client, tmpdir): + with open(tmpdir / "acceptance-test-config.yml", "w") as f: + yaml.safe_dump({"connector_image": "airbyte/connector_under_test_image:dev"}, f) + return dagger_client.host().directory(str(tmpdir)) + + def get_patched_acceptance_test_step(self, dagger_client, mocker, test_context, test_input_dir): + test_context.get_connector_dir = mocker.AsyncMock(return_value=test_input_dir) + test_context.connector_acceptance_test_image = "bash:latest" + test_context.connector_secrets = {"config.json": dagger_client.set_secret("config.json", "connector_secret")} + + mocker.patch.object(common.environments, "load_image_to_docker_host", return_value="image_sha") + mocker.patch.object(common.environments, "with_bound_docker_host", lambda _, cat_container: cat_container) + mocker.patch.object(common.AcceptanceTests, "get_cache_buster", return_value="cache_buster") + return common.AcceptanceTests(test_context) + + async def test_cat_container_provisioning(self, dagger_client, mocker, test_context, test_input_dir): + """Check that the acceptance test container is correctly provisioned. + We check that: + - the test input and secrets are correctly mounted. + - the cache buster and image sha are correctly set as environment variables. + - that the entrypoint is correctly set. + - the current working directory is correctly set. + """ + acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context, test_input_dir) + cat_container = await acceptance_test_step._build_connector_acceptance_test("connector_under_test_image_tar") + assert await cat_container.entrypoint() == [] + assert (await cat_container.with_exec(["pwd"]).stdout()).strip() == acceptance_test_step.CONTAINER_TEST_INPUT_DIRECTORY + test_input_ls_result = await cat_container.with_exec(["ls"]).stdout() + assert all( + file_or_directory in test_input_ls_result.splitlines() for file_or_directory in ["secrets", "acceptance-test-config.yml"] + ) + assert await cat_container.with_exec(["cat", f"{acceptance_test_step.CONTAINER_SECRETS_DIRECTORY}/config.json"]).stdout() == "***" + env_vars = {await env_var.name(): await env_var.value() for env_var in await cat_container.env_variables()} + assert env_vars["CACHEBUSTER"] == "cache_buster" + assert env_vars["CONNECTOR_IMAGE_ID"] == "image_sha" + + async def test_cat_container_caching(self, dagger_client, mocker, test_context, test_input_dir): + """Check that the acceptance test container caching behavior is correct.""" + + acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context, test_input_dir) + cat_container = await acceptance_test_step._build_connector_acceptance_test("connector_under_test_image_tar") + cat_container = cat_container.with_exec(["date"]) + fist_date_result = await cat_container.stdout() + + time.sleep(1) + # Check that cache is used + cat_container = await acceptance_test_step._build_connector_acceptance_test("connector_under_test_image_tar") + cat_container = cat_container.with_exec(["date"]) + second_date_result = await cat_container.stdout() + assert fist_date_result == second_date_result + + time.sleep(1) + # Check that cache buster is used to invalidate the cache + previous_cache_buster_value = acceptance_test_step.get_cache_buster() + new_cache_buster_value = previous_cache_buster_value + "1" + mocker.patch.object(common.AcceptanceTests, "get_cache_buster", return_value=new_cache_buster_value) + cat_container = await acceptance_test_step._build_connector_acceptance_test("connector_under_test_image_tar") + cat_container = cat_container.with_exec(["date"]) + third_date_result = await cat_container.stdout() + assert third_date_result != second_date_result + + time.sleep(1) + # Check that image sha is used to invalidate the cache + previous_image_sha_value = await common.environments.load_image_to_docker_host("foo", "bar", "baz") + mocker.patch.object(common.environments, "load_image_to_docker_host", return_value=previous_image_sha_value + "1") + cat_container = await acceptance_test_step._build_connector_acceptance_test("connector_under_test_image_tar") + cat_container = cat_container.with_exec(["date"]) + fourth_date_result = await cat_container.stdout() + assert fourth_date_result != third_date_result + + time.sleep(1) + # Check the cache is used again + cat_container = await acceptance_test_step._build_connector_acceptance_test("connector_under_test_image_tar") + cat_container = cat_container.with_exec(["date"]) + fifth_date_result = await cat_container.stdout() + assert fifth_date_result == fourth_date_result