Skip to content

Commit

Permalink
connectors-ci: improve pytest result evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere committed Jul 27, 2023
1 parent 9f6963c commit ddb7345
Show file tree
Hide file tree
Showing 10 changed files with 414 additions and 130 deletions.
46 changes: 4 additions & 42 deletions airbyte-ci/connectors/pipelines/pipelines/actions/environments.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,23 @@

import importlib.util
import json
import toml
import re
import uuid
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, List, Optional

import yaml
import toml
from dagger import CacheVolume, Client, Container, DaggerError, Directory, File, Platform, Secret
from dagger.engine._version import CLI_VERSION as dagger_engine_version
from pipelines import consts
from pipelines.consts import (
CONNECTOR_OPS_SOURCE_PATHSOURCE_PATH,
CI_CREDENTIALS_SOURCE_PATH,
CONNECTOR_OPS_SOURCE_PATHSOURCE_PATH,
CONNECTOR_TESTING_REQUIREMENTS,
LICENSE_SHORT_FILE_PATH,
PYPROJECT_TOML_FILE_PATH,
)
from pipelines.utils import get_file_contents
from dagger import CacheVolume, Client, Container, DaggerError, Directory, File, Platform, Secret
from dagger.engine._version import CLI_VERSION as dagger_engine_version

if TYPE_CHECKING:
from pipelines.contexts import ConnectorContext, PipelineContext
Expand Down Expand Up @@ -502,42 +500,6 @@ def with_docker_cli(context: ConnectorContext) -> Container:
return with_bound_docker_host(context, docker_cli)


async def with_connector_acceptance_test(context: ConnectorContext, connector_under_test_image_tar: File) -> Container:
"""Create a container to run connector acceptance tests, bound to a persistent docker host.
Args:
context (ConnectorContext): The current connector context.
connector_under_test_image_tar (File): The file containing the tar archive the image of the connector under test.
Returns:
Container: A container with connector acceptance tests installed.
"""
test_input = await context.get_connector_dir()
cat_config = yaml.safe_load(await test_input.file("acceptance-test-config.yml").contents())

image_sha = await load_image_to_docker_host(context, connector_under_test_image_tar, cat_config["connector_image"])

if context.connector_acceptance_test_image.endswith(":dev"):
cat_container = context.connector_acceptance_test_source_dir.docker_build()
else:
cat_container = context.dagger_client.container().from_(context.connector_acceptance_test_image)

return (
with_bound_docker_host(context, cat_container)
.with_entrypoint([])
.with_exec(["pip", "install", "pytest-custom_exit_code"])
.with_mounted_directory("/test_input", test_input)
.with_env_variable("CONNECTOR_IMAGE_ID", image_sha)
# This bursts the CAT cached results everyday.
# It's cool because in case of a partially failing nightly build the connectors that already ran CAT won't re-run CAT.
# We keep the guarantee that a CAT runs everyday.
.with_env_variable("CACHEBUSTER", datetime.utcnow().strftime("%Y%m%d"))
.with_workdir("/test_input")
.with_entrypoint(["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "--suppress-tests-failed-exit-code"])
.with_(mounted_connector_secrets(context, "/test_input/secrets"))
.with_exec(["--acceptance-test-config", "/test_input"])
)


def with_gradle(
context: ConnectorContext,
sources_to_include: List[str] = None,
Expand Down
134 changes: 73 additions & 61 deletions airbyte-ci/connectors/pipelines/pipelines/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
import anyio
import asyncer
from anyio import Path
from pipelines.actions import remote_storage
from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH
from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify
from connector_ops.utils import console
from dagger import Container, DaggerError, QueryError
from jinja2 import Environment, PackageLoader, select_autoescape
from pipelines.actions import remote_storage
from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH
from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify
from rich.console import Group
from rich.panel import Panel
from rich.style import Style
Expand Down Expand Up @@ -54,26 +54,6 @@ class StepStatus(Enum):
FAILURE = "Failed"
SKIPPED = "Skipped"

def from_exit_code(exit_code: int) -> StepStatus:
"""Map an exit code to a step status.
Args:
exit_code (int): A process exit code.
Raises:
ValueError: Raised if the exit code is not mapped to a step status.
Returns:
StepStatus: The step status inferred from the exit code.
"""
if exit_code == 0:
return StepStatus.SUCCESS
# pytest returns a 5 exit code when no test is found.
elif exit_code == 5:
return StepStatus.SKIPPED
else:
return StepStatus.FAILURE

def get_rich_style(self) -> Style:
"""Match color used in the console output to the step status."""
if self is StepStatus.SUCCESS:
Expand Down Expand Up @@ -102,6 +82,9 @@ class Step(ABC):
title: ClassVar[str]
max_retries: ClassVar[int] = 0
should_log: ClassVar[bool] = True
should_persist_stdout_stderr_logs: ClassVar[bool] = True
success_exit_code: ClassVar[int] = 0
skipped_exit_code: ClassVar[int] = None

def __init__(self, context: PipelineContext) -> None: # noqa D107
self.context = context
Expand All @@ -119,18 +102,22 @@ def run_duration(self) -> timedelta:
@property
def logger(self) -> logging.Logger:
if self.should_log:
return self.context.logger
return logging.getLogger(f"{self.context.pipeline_name} - {self.title}")
else:
disabled_logger = logging.getLogger()
disabled_logger.disabled = True
return disabled_logger

@property
def dagger_client(self) -> Container:
return self.context.dagger_client.pipeline(self.title)

async def log_progress(self, completion_event) -> None:
while not completion_event.is_set():
duration = datetime.utcnow() - self.started_at
elapsed_seconds = duration.total_seconds()
if elapsed_seconds > 30 and round(elapsed_seconds) % 30 == 0:
self.logger.info(f"⏳ Still running {self.title}... (duration: {format_duration(duration)})")
self.logger.info(f"⏳ Still running... (duration: {format_duration(duration)})")
await anyio.sleep(1)

async def run_with_completion(self, completion_event, *args, **kwargs) -> StepResult:
Expand Down Expand Up @@ -159,7 +146,7 @@ async def run(self, *args, **kwargs) -> StepResult:
if result.status is StepStatus.FAILURE and self.retry_count <= self.max_retries and self.max_retries > 0:
self.retry_count += 1
await anyio.sleep(10)
self.logger.warn(f"Retry #{self.retry_count} for {self.title} step on connector {self.context.connector.technical_name}.")
self.logger.warn(f"Retry #{self.retry_count}.")
return await self.run(*args, **kwargs)
self.stopped_at = datetime.utcnow()
self.log_step_result(result)
Expand All @@ -177,11 +164,11 @@ def log_step_result(self, result: StepResult) -> None:
"""
duration = format_duration(self.run_duration)
if result.status is StepStatus.FAILURE:
self.logger.error(f"{result.status.get_emoji()} {self.title} failed (duration: {duration})")
self.logger.error(f"{result.status.get_emoji()} failed (duration: {duration})")
if result.status is StepStatus.SKIPPED:
self.logger.info(f"{result.status.get_emoji()} {self.title} was skipped (duration: {duration})")
self.logger.info(f"{result.status.get_emoji()} was skipped (duration: {duration})")
if result.status is StepStatus.SUCCESS:
self.logger.info(f"{result.status.get_emoji()} {self.title} was successful (duration: {duration})")
self.logger.info(f"{result.status.get_emoji()} was successful (duration: {duration})")

@abstractmethod
async def _run(self, *args, **kwargs) -> StepResult:
Expand All @@ -203,6 +190,58 @@ def skip(self, reason: str = None) -> StepResult:
"""
return StepResult(self, StepStatus.SKIPPED, stdout=reason)

async def write_log_files(self, stdout: Optional[str] = None, stderr: Optional[str] = None) -> List[Path]:
"""Write stdout and stderr logs to a file in the connector code directory.
Args:
stdout (Optional[str], optional): The step final container stdout. Defaults to None.
stderr (Optional[str], optional): The step final container stderr. Defaults to None.
Returns:
List[Path]: The list of written log files.
"""
if not stdout and not stderr:
return []

written_log_files = []
log_directory = Path(f"{self.context.connector.code_directory}/airbyte_ci_logs/{slugify(self.context.pipeline_name)}")
await log_directory.mkdir(exist_ok=True, parents=True)
if stdout:
# TODO alafanechere we could also log the stdout and stderr of the container in the pipeline context.
# It could be a nice alternative to the --show-dagger-logs flag.
stdout_log_path = await (log_directory / f"{slugify(self.title).replace('-', '_')}_stdout.log").resolve()
await stdout_log_path.write_text(stdout)
self.logger.info(f"stdout logs written to {stdout_log_path}")
written_log_files.append(stdout_log_path)
if stderr:
stderr_log_path = await (log_directory / f"{slugify(self.title).replace('-', '_')}_stderr.log").resolve()
await stderr_log_path.write_text(stderr)
self.logger.info(f"stderr logs written to {stderr_log_path}")
written_log_files.append(stderr_log_path)
return written_log_files

def get_step_status_from_exit_code(
self,
exit_code: int,
) -> StepStatus:
"""Map an exit code to a step status.
Args:
exit_code (int): A process exit code.
Raises:
ValueError: Raised if the exit code is not mapped to a step status.
Returns:
StepStatus: The step status inferred from the exit code.
"""
if exit_code == self.success_exit_code:
return StepStatus.SUCCESS
elif self.skipped_exit_code is not None and exit_code == self.skipped_exit_code:
return StepStatus.SKIPPED
else:
return StepStatus.FAILURE

async def get_step_result(self, container: Container) -> StepResult:
"""Concurrent retrieval of exit code, stdout and stdout of a container.
Expand All @@ -215,9 +254,11 @@ async def get_step_result(self, container: Container) -> StepResult:
StepResult: Failure or success with stdout and stderr.
"""
exit_code, stdout, stderr = await get_exec_result(container)
if self.context.is_local and self.should_persist_stdout_stderr_logs:
await self.write_log_files(stdout, stderr)
return StepResult(
self,
StepStatus.from_exit_code(exit_code),
self.get_step_status_from_exit_code(exit_code),
stderr=stderr,
stdout=stdout,
output_artifact=container,
Expand All @@ -227,31 +268,7 @@ async def get_step_result(self, container: Container) -> StepResult:
class PytestStep(Step, ABC):
"""An abstract class to run pytest tests and evaluate success or failure according to pytest logs."""

async def write_log_file(self, logs) -> str:
"""Return the path to the pytest log file."""
log_directory = Path(f"{self.context.connector.code_directory}/airbyte_ci_logs")
await log_directory.mkdir(exist_ok=True)
log_path = await (log_directory / f"{slugify(self.title).replace('-', '_')}.log").resolve()
await log_path.write_text(logs)
self.logger.info(f"Pytest logs written to {log_path}")

# TODO this is not very robust if pytest crashes and does not outputs its expected last log line.
def pytest_logs_to_step_result(self, logs: str) -> StepResult:
"""Parse pytest log and infer failure, success or skipping.
Args:
logs (str): The pytest logs.
Returns:
StepResult: The inferred step result according to the log.
"""
last_log_line = logs.split("\n")[-2]
if "failed" in last_log_line or "errors" in last_log_line:
return StepResult(self, StepStatus.FAILURE, stderr=logs)
elif "no tests ran" in last_log_line:
return StepResult(self, StepStatus.SKIPPED, stdout=logs)
else:
return StepResult(self, StepStatus.SUCCESS, stdout=logs)
skipped_exit_code = 5

async def _run_tests_in_directory(self, connector_under_test: Container, test_directory: str) -> StepResult:
"""Run the pytest tests in the test_directory that was passed.
Expand All @@ -272,18 +289,13 @@ async def _run_tests_in_directory(self, connector_under_test: Container, test_di
"python",
"-m",
"pytest",
"--suppress-tests-failed-exit-code",
"--suppress-no-test-exit-code",
"-s",
test_directory,
"-c",
test_config,
]
)
logs = await tester.stdout()
if self.context.is_local:
await self.write_log_file(logs)
return self.pytest_logs_to_step_result(logs)
return await self.get_step_result(tester)

else:
return StepResult(self, StepStatus.SKIPPED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


from pipelines.actions import environments
from pipelines.bases import StepResult, StepStatus
from pipelines.bases import StepResult
from pipelines.gradle import GradleTask
from pipelines.utils import get_exec_result

Expand All @@ -25,7 +25,7 @@ async def _run(self) -> StepResult:
exit_code, stdout, stderr = await get_exec_result(formatted)
return StepResult(
self,
StepStatus.from_exit_code(exit_code),
self.get_step_status_from_exit_code(exit_code),
stderr=stderr,
stdout=stdout,
output_artifact=formatted.directory(str(self.context.connector.code_directory)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import asyncer
from pipelines.actions import environments
from pipelines.bases import Step, StepResult, StepStatus
from pipelines.bases import Step, StepResult
from pipelines.utils import with_exit_code, with_stderr, with_stdout


Expand Down Expand Up @@ -50,7 +50,7 @@ async def _run(self) -> StepResult:

return StepResult(
self,
StepStatus.from_exit_code(soon_exit_code.value),
self.get_step_status_from_exit_code(await soon_exit_code),
stderr=soon_stderr.value,
stdout=soon_stdout.value,
output_artifact=formatted.directory("/connector_code"),
Expand Down
29 changes: 27 additions & 2 deletions airbyte-ci/connectors/pipelines/pipelines/hacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
from __future__ import annotations

from copy import deepcopy
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Callable, List

import requests
import yaml
from connector_ops.utils import ConnectorLanguage
from dagger import DaggerError

if TYPE_CHECKING:
from dagger import Client, Container, Directory
from pipelines.contexts import ConnectorContext
from dagger import Client, Directory


LINES_TO_REMOVE_FROM_GRADLE_FILE = [
Expand Down Expand Up @@ -140,3 +140,28 @@ async def cache_latest_cdk(dagger_client: Client, pip_cache_volume_name: str = "
.with_exec(["pip", "install", "--force-reinstall", f"airbyte-cdk=={cdk_latest_version}"])
.sync()
)


def never_fail_exec(command: List[str]) -> Callable:
"""
Wrap a command execution with some bash sugar to always exit with a 0 exit code but write the actual exit code to a file.
Underlying issue:
When a classic dagger with_exec is returning a >0 exit code an ExecError is raised.
It's OK for the majority of our container interaction.
But some execution, like running CAT, are expected to often fail.
In CAT we don't want ExecError to be raised on container interaction because CAT might write updated secrets that we need to pull from the container after the test run.
The bash trick below is a hack to always return a 0 exit code but write the actual exit code to a file.
The file is then read by the pipeline to determine the exit code of the container.
Args:
command (List[str]): The command to run in the container.
Returns:
Callable: _description_
"""

def never_fail_exec_inner(container: Container):
return container.with_exec(["sh", "-c", f"{' '.join(command)}; echo $? > /exit_code"])

return never_fail_exec_inner
Loading

0 comments on commit ddb7345

Please sign in to comment.