Skip to content

Commit

Permalink
connectors-ci: improve pytest result evaluation (#28767)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored and bnchrch committed Aug 3, 2023
1 parent 0ce29c5 commit c7dce72
Show file tree
Hide file tree
Showing 11 changed files with 355 additions and 128 deletions.
3 changes: 2 additions & 1 deletion airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ This command runs the Python tests for a airbyte-ci poetry package.

| Version | PR | Description |
|---------|-----------------------------------------------------------|----------------------------------------------------------------------------------------------|
| 0.2.1 | [#28897](https://github.com/airbytehq/airbyte/pull/28897) | Sentry: Ignore error logs without exceptions from reporting |
| 0.2.2 | [#28897](https://github.com/airbytehq/airbyte/pull/28897) | Sentry: Ignore error logs without exceptions from reporting |
| 0.2.1 | [#28767](https://github.com/airbytehq/airbyte/pull/28767) | Improve pytest step result evaluation to prevent false negative/positive. |
| 0.2.0 | [#28857](https://github.com/airbytehq/airbyte/pull/28857) | Add the `airbyte-ci tests` command to run the test suite on any `airbyte-ci` poetry package. |
| 0.1.1 | [#28858](https://github.com/airbytehq/airbyte/pull/28858) | Increase the max duration of Connector Package install to 20mn. |
| 0.1.0 | | Alpha version not in production yet. All the commands described in this doc are available. |
Expand Down
38 changes: 0 additions & 38 deletions airbyte-ci/connectors/pipelines/pipelines/actions/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
import json
import re
import uuid
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, List, Optional

import toml
import yaml
from dagger import CacheVolume, Client, Container, DaggerError, Directory, File, Platform, Secret
from dagger.engine._version import CLI_VERSION as dagger_engine_version
from pipelines import consts
Expand Down Expand Up @@ -502,42 +500,6 @@ def with_docker_cli(context: ConnectorContext) -> Container:
return with_bound_docker_host(context, docker_cli)


async def with_connector_acceptance_test(context: ConnectorContext, connector_under_test_image_tar: File) -> Container:
"""Create a container to run connector acceptance tests, bound to a persistent docker host.
Args:
context (ConnectorContext): The current connector context.
connector_under_test_image_tar (File): The file containing the tar archive the image of the connector under test.
Returns:
Container: A container with connector acceptance tests installed.
"""
test_input = await context.get_connector_dir()
cat_config = yaml.safe_load(await test_input.file("acceptance-test-config.yml").contents())

image_sha = await load_image_to_docker_host(context, connector_under_test_image_tar, cat_config["connector_image"])

if context.connector_acceptance_test_image.endswith(":dev"):
cat_container = context.connector_acceptance_test_source_dir.docker_build()
else:
cat_container = context.dagger_client.container().from_(context.connector_acceptance_test_image)

return (
with_bound_docker_host(context, cat_container)
.with_entrypoint([])
.with_exec(["pip", "install", "pytest-custom_exit_code"])
.with_mounted_directory("/test_input", test_input)
.with_env_variable("CONNECTOR_IMAGE_ID", image_sha)
# This bursts the CAT cached results everyday.
# It's cool because in case of a partially failing nightly build the connectors that already ran CAT won't re-run CAT.
# We keep the guarantee that a CAT runs everyday.
.with_env_variable("CACHEBUSTER", datetime.utcnow().strftime("%Y%m%d"))
.with_workdir("/test_input")
.with_entrypoint(["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "--suppress-tests-failed-exit-code"])
.with_(mounted_connector_secrets(context, "/test_input/secrets"))
.with_exec(["--acceptance-test-config", "/test_input"])
)


def with_gradle(
context: ConnectorContext,
sources_to_include: List[str] = None,
Expand Down
100 changes: 39 additions & 61 deletions airbyte-ci/connectors/pipelines/pipelines/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
import anyio
import asyncer
from anyio import Path
from pipelines import sentry_utils

from connector_ops.utils import console
from dagger import Container, DaggerError, QueryError
from jinja2 import Environment, PackageLoader, select_autoescape
from pipelines import sentry_utils
from pipelines.actions import remote_storage
from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH
from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify
from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result
from rich.console import Group
from rich.panel import Panel
from rich.style import Style
Expand Down Expand Up @@ -56,26 +55,6 @@ class StepStatus(Enum):
FAILURE = "Failed"
SKIPPED = "Skipped"

def from_exit_code(exit_code: int) -> StepStatus:
"""Map an exit code to a step status.
Args:
exit_code (int): A process exit code.
Raises:
ValueError: Raised if the exit code is not mapped to a step status.
Returns:
StepStatus: The step status inferred from the exit code.
"""
if exit_code == 0:
return StepStatus.SUCCESS
# pytest returns a 5 exit code when no test is found.
elif exit_code == 5:
return StepStatus.SKIPPED
else:
return StepStatus.FAILURE

def get_rich_style(self) -> Style:
"""Match color used in the console output to the step status."""
if self is StepStatus.SUCCESS:
Expand Down Expand Up @@ -104,6 +83,8 @@ class Step(ABC):
title: ClassVar[str]
max_retries: ClassVar[int] = 0
should_log: ClassVar[bool] = True
success_exit_code: ClassVar[int] = 0
skipped_exit_code: ClassVar[int] = None
# The max duration of a step run. If the step run for more than this duration it will be considered as timed out.
# The default of 5 hours is arbitrary and can be changed if needed.
max_duration: ClassVar[timedelta] = timedelta(hours=5)
Expand All @@ -124,19 +105,23 @@ def run_duration(self) -> timedelta:
@property
def logger(self) -> logging.Logger:
if self.should_log:
return self.context.logger
return logging.getLogger(f"{self.context.pipeline_name} - {self.title}")
else:
disabled_logger = logging.getLogger()
disabled_logger.disabled = True
return disabled_logger

@property
def dagger_client(self) -> Container:
return self.context.dagger_client.pipeline(self.title)

async def log_progress(self, completion_event: anyio.Event) -> None:
"""Log the step progress every 30 seconds until the step is done."""
while not completion_event.is_set():
duration = datetime.utcnow() - self.started_at
elapsed_seconds = duration.total_seconds()
if elapsed_seconds > 30 and round(elapsed_seconds) % 30 == 0:
self.logger.info(f"⏳ Still running {self.title}... (duration: {format_duration(duration)})")
self.logger.info(f"⏳ Still running... (duration: {format_duration(duration)})")
await anyio.sleep(1)

async def run_with_completion(self, completion_event: anyio.Event, *args, **kwargs) -> StepResult:
Expand Down Expand Up @@ -174,7 +159,7 @@ async def run(self, *args, **kwargs) -> StepResult:
if result.status is StepStatus.FAILURE and self.retry_count <= self.max_retries and self.max_retries > 0:
self.retry_count += 1
await anyio.sleep(10)
self.logger.warn(f"Retry #{self.retry_count} for {self.title} step on connector {self.context.connector.technical_name}.")
self.logger.warn(f"Retry #{self.retry_count}.")
return await self.run(*args, **kwargs)
self.stopped_at = datetime.utcnow()
self.log_step_result(result)
Expand All @@ -192,11 +177,11 @@ def log_step_result(self, result: StepResult) -> None:
"""
duration = format_duration(self.run_duration)
if result.status is StepStatus.FAILURE:
self.logger.error(f"{result.status.get_emoji()} {self.title} failed (duration: {duration})")
self.logger.error(f"{result.status.get_emoji()} failed (duration: {duration})")
if result.status is StepStatus.SKIPPED:
self.logger.info(f"{result.status.get_emoji()} {self.title} was skipped (duration: {duration})")
self.logger.info(f"{result.status.get_emoji()} was skipped (duration: {duration})")
if result.status is StepStatus.SUCCESS:
self.logger.info(f"{result.status.get_emoji()} {self.title} was successful (duration: {duration})")
self.logger.info(f"{result.status.get_emoji()} was successful (duration: {duration})")

@abstractmethod
async def _run(self, *args, **kwargs) -> StepResult:
Expand All @@ -218,6 +203,28 @@ def skip(self, reason: str = None) -> StepResult:
"""
return StepResult(self, StepStatus.SKIPPED, stdout=reason)

def get_step_status_from_exit_code(
self,
exit_code: int,
) -> StepStatus:
"""Map an exit code to a step status.
Args:
exit_code (int): A process exit code.
Raises:
ValueError: Raised if the exit code is not mapped to a step status.
Returns:
StepStatus: The step status inferred from the exit code.
"""
if exit_code == self.success_exit_code:
return StepStatus.SUCCESS
elif self.skipped_exit_code is not None and exit_code == self.skipped_exit_code:
return StepStatus.SKIPPED
else:
return StepStatus.FAILURE

async def get_step_result(self, container: Container) -> StepResult:
"""Concurrent retrieval of exit code, stdout and stdout of a container.
Expand All @@ -232,7 +239,7 @@ async def get_step_result(self, container: Container) -> StepResult:
exit_code, stdout, stderr = await get_exec_result(container)
return StepResult(
self,
StepStatus.from_exit_code(exit_code),
self.get_step_status_from_exit_code(exit_code),
stderr=stderr,
stdout=stdout,
output_artifact=container,
Expand All @@ -249,31 +256,7 @@ def _get_timed_out_step_result(self) -> StepResult:
class PytestStep(Step, ABC):
"""An abstract class to run pytest tests and evaluate success or failure according to pytest logs."""

async def write_log_file(self, logs) -> str:
"""Return the path to the pytest log file."""
log_directory = Path(f"{self.context.connector.code_directory}/airbyte_ci_logs")
await log_directory.mkdir(exist_ok=True)
log_path = await (log_directory / f"{slugify(self.title).replace('-', '_')}.log").resolve()
await log_path.write_text(logs)
self.logger.info(f"Pytest logs written to {log_path}")

# TODO this is not very robust if pytest crashes and does not outputs its expected last log line.
def pytest_logs_to_step_result(self, logs: str) -> StepResult:
"""Parse pytest log and infer failure, success or skipping.
Args:
logs (str): The pytest logs.
Returns:
StepResult: The inferred step result according to the log.
"""
last_log_line = logs.split("\n")[-2]
if "failed" in last_log_line or "errors" in last_log_line:
return StepResult(self, StepStatus.FAILURE, stderr=logs)
elif "no tests ran" in last_log_line:
return StepResult(self, StepStatus.SKIPPED, stdout=logs)
else:
return StepResult(self, StepStatus.SUCCESS, stdout=logs)
skipped_exit_code = 5

async def _run_tests_in_directory(self, connector_under_test: Container, test_directory: str) -> StepResult:
"""Run the pytest tests in the test_directory that was passed.
Expand All @@ -294,18 +277,13 @@ async def _run_tests_in_directory(self, connector_under_test: Container, test_di
"python",
"-m",
"pytest",
"--suppress-tests-failed-exit-code",
"--suppress-no-test-exit-code",
"-s",
test_directory,
"-c",
test_config,
]
)
logs = await tester.stdout()
if self.context.is_local:
await self.write_log_file(logs)
return self.pytest_logs_to_step_result(logs)
return await self.get_step_result(tester)

else:
return StepResult(self, StepStatus.SKIPPED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


from pipelines.actions import environments
from pipelines.bases import StepResult, StepStatus
from pipelines.bases import StepResult
from pipelines.gradle import GradleTask
from pipelines.utils import get_exec_result

Expand All @@ -25,7 +25,7 @@ async def _run(self) -> StepResult:
exit_code, stdout, stderr = await get_exec_result(formatted)
return StepResult(
self,
StepStatus.from_exit_code(exit_code),
self.get_step_status_from_exit_code(exit_code),
stderr=stderr,
stdout=stdout,
output_artifact=formatted.directory(str(self.context.connector.code_directory)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import asyncer
from pipelines.actions import environments
from pipelines.bases import Step, StepResult, StepStatus
from pipelines.bases import Step, StepResult
from pipelines.utils import with_exit_code, with_stderr, with_stdout


Expand Down Expand Up @@ -50,7 +50,7 @@ async def _run(self) -> StepResult:

return StepResult(
self,
StepStatus.from_exit_code(soon_exit_code.value),
self.get_step_status_from_exit_code(await soon_exit_code),
stderr=soon_stderr.value,
stdout=soon_stdout.value,
output_artifact=formatted.directory("/connector_code"),
Expand Down
29 changes: 27 additions & 2 deletions airbyte-ci/connectors/pipelines/pipelines/hacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
from __future__ import annotations

from copy import deepcopy
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Callable, List

import requests
import yaml
from connector_ops.utils import ConnectorLanguage
from dagger import DaggerError

if TYPE_CHECKING:
from dagger import Client, Container, Directory
from pipelines.contexts import ConnectorContext
from dagger import Client, Directory


LINES_TO_REMOVE_FROM_GRADLE_FILE = [
Expand Down Expand Up @@ -140,3 +140,28 @@ async def cache_latest_cdk(dagger_client: Client, pip_cache_volume_name: str = "
.with_exec(["pip", "install", "--force-reinstall", f"airbyte-cdk=={cdk_latest_version}"])
.sync()
)


def never_fail_exec(command: List[str]) -> Callable:
"""
Wrap a command execution with some bash sugar to always exit with a 0 exit code but write the actual exit code to a file.
Underlying issue:
When a classic dagger with_exec is returning a >0 exit code an ExecError is raised.
It's OK for the majority of our container interaction.
But some execution, like running CAT, are expected to often fail.
In CAT we don't want ExecError to be raised on container interaction because CAT might write updated secrets that we need to pull from the container after the test run.
The bash trick below is a hack to always return a 0 exit code but write the actual exit code to a file.
The file is then read by the pipeline to determine the exit code of the container.
Args:
command (List[str]): The command to run in the container.
Returns:
Callable: _description_
"""

def never_fail_exec_inner(container: Container):
return container.with_exec(["sh", "-c", f"{' '.join(command)}; echo $? > /exit_code"])

return never_fail_exec_inner
Loading

0 comments on commit c7dce72

Please sign in to comment.