Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connectors-ci: improve pytest result evaluation #28767

8 changes: 8 additions & 0 deletions .github/workflows/airbyte_ci_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
name: Airbyte CI pipeline tests

on:
push:
branches:
- !master
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are our tests now passing for pipelines? Did I miss that PR?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it was a WIP push from a different branch: #28857

paths:
- "airbyte-ci/"
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
import json
import re
import uuid
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Callable, List, Optional

import toml
import yaml
from dagger import CacheVolume, Client, Container, DaggerError, Directory, File, Platform, Secret
from dagger.engine._version import CLI_VERSION as dagger_engine_version
from pipelines import consts
Expand Down Expand Up @@ -502,42 +500,6 @@ def with_docker_cli(context: ConnectorContext) -> Container:
return with_bound_docker_host(context, docker_cli)


async def with_connector_acceptance_test(context: ConnectorContext, connector_under_test_image_tar: File) -> Container:
"""Create a container to run connector acceptance tests, bound to a persistent docker host.

Args:
context (ConnectorContext): The current connector context.
connector_under_test_image_tar (File): The file containing the tar archive the image of the connector under test.
Returns:
Container: A container with connector acceptance tests installed.
"""
test_input = await context.get_connector_dir()
cat_config = yaml.safe_load(await test_input.file("acceptance-test-config.yml").contents())

image_sha = await load_image_to_docker_host(context, connector_under_test_image_tar, cat_config["connector_image"])

if context.connector_acceptance_test_image.endswith(":dev"):
cat_container = context.connector_acceptance_test_source_dir.docker_build()
else:
cat_container = context.dagger_client.container().from_(context.connector_acceptance_test_image)

return (
with_bound_docker_host(context, cat_container)
.with_entrypoint([])
.with_exec(["pip", "install", "pytest-custom_exit_code"])
.with_mounted_directory("/test_input", test_input)
.with_env_variable("CONNECTOR_IMAGE_ID", image_sha)
# This bursts the CAT cached results everyday.
# It's cool because in case of a partially failing nightly build the connectors that already ran CAT won't re-run CAT.
# We keep the guarantee that a CAT runs everyday.
.with_env_variable("CACHEBUSTER", datetime.utcnow().strftime("%Y%m%d"))
.with_workdir("/test_input")
.with_entrypoint(["python", "-m", "pytest", "-p", "connector_acceptance_test.plugin", "--suppress-tests-failed-exit-code"])
.with_(mounted_connector_secrets(context, "/test_input/secrets"))
.with_exec(["--acceptance-test-config", "/test_input"])
)


def with_gradle(
context: ConnectorContext,
sources_to_include: List[str] = None,
Expand Down
131 changes: 71 additions & 60 deletions airbyte-ci/connectors/pipelines/pipelines/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
import anyio
import asyncer
from anyio import Path
from pipelines import sentry_utils

from connector_ops.utils import console
from dagger import Container, DaggerError, QueryError
from jinja2 import Environment, PackageLoader, select_autoescape
from pipelines import sentry_utils
from pipelines.actions import remote_storage
from pipelines.consts import GCS_PUBLIC_DOMAIN, LOCAL_REPORTS_PATH_ROOT, PYPROJECT_TOML_FILE_PATH
from pipelines.utils import check_path_in_workdir, format_duration, get_exec_result, slugify
Expand Down Expand Up @@ -56,26 +55,6 @@ class StepStatus(Enum):
FAILURE = "Failed"
SKIPPED = "Skipped"

def from_exit_code(exit_code: int) -> StepStatus:
"""Map an exit code to a step status.

Args:
exit_code (int): A process exit code.

Raises:
ValueError: Raised if the exit code is not mapped to a step status.

Returns:
StepStatus: The step status inferred from the exit code.
"""
if exit_code == 0:
return StepStatus.SUCCESS
# pytest returns a 5 exit code when no test is found.
elif exit_code == 5:
return StepStatus.SKIPPED
else:
return StepStatus.FAILURE

def get_rich_style(self) -> Style:
"""Match color used in the console output to the step status."""
if self is StepStatus.SUCCESS:
Expand Down Expand Up @@ -104,6 +83,9 @@ class Step(ABC):
title: ClassVar[str]
max_retries: ClassVar[int] = 0
should_log: ClassVar[bool] = True
should_persist_stdout_stderr_logs: ClassVar[bool] = True
success_exit_code: ClassVar[int] = 0
skipped_exit_code: ClassVar[int] = None
# The max duration of a step run. If the step run for more than this duration it will be considered as timed out.
# The default of 5 hours is arbitrary and can be changed if needed.
max_duration: ClassVar[timedelta] = timedelta(hours=5)
Expand All @@ -124,19 +106,23 @@ def run_duration(self) -> timedelta:
@property
def logger(self) -> logging.Logger:
if self.should_log:
return self.context.logger
return logging.getLogger(f"{self.context.pipeline_name} - {self.title}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

else:
disabled_logger = logging.getLogger()
disabled_logger.disabled = True
return disabled_logger

@property
def dagger_client(self) -> Container:
return self.context.dagger_client.pipeline(self.title)

async def log_progress(self, completion_event: anyio.Event) -> None:
"""Log the step progress every 30 seconds until the step is done."""
while not completion_event.is_set():
duration = datetime.utcnow() - self.started_at
elapsed_seconds = duration.total_seconds()
if elapsed_seconds > 30 and round(elapsed_seconds) % 30 == 0:
self.logger.info(f"⏳ Still running {self.title}... (duration: {format_duration(duration)})")
self.logger.info(f"⏳ Still running... (duration: {format_duration(duration)})")
await anyio.sleep(1)

async def run_with_completion(self, completion_event: anyio.Event, *args, **kwargs) -> StepResult:
Expand Down Expand Up @@ -174,7 +160,7 @@ async def run(self, *args, **kwargs) -> StepResult:
if result.status is StepStatus.FAILURE and self.retry_count <= self.max_retries and self.max_retries > 0:
self.retry_count += 1
await anyio.sleep(10)
self.logger.warn(f"Retry #{self.retry_count} for {self.title} step on connector {self.context.connector.technical_name}.")
self.logger.warn(f"Retry #{self.retry_count}.")
return await self.run(*args, **kwargs)
self.stopped_at = datetime.utcnow()
self.log_step_result(result)
Expand All @@ -192,11 +178,11 @@ def log_step_result(self, result: StepResult) -> None:
"""
duration = format_duration(self.run_duration)
if result.status is StepStatus.FAILURE:
self.logger.error(f"{result.status.get_emoji()} {self.title} failed (duration: {duration})")
self.logger.error(f"{result.status.get_emoji()} failed (duration: {duration})")
if result.status is StepStatus.SKIPPED:
self.logger.info(f"{result.status.get_emoji()} {self.title} was skipped (duration: {duration})")
self.logger.info(f"{result.status.get_emoji()} was skipped (duration: {duration})")
if result.status is StepStatus.SUCCESS:
self.logger.info(f"{result.status.get_emoji()} {self.title} was successful (duration: {duration})")
self.logger.info(f"{result.status.get_emoji()} was successful (duration: {duration})")

@abstractmethod
async def _run(self, *args, **kwargs) -> StepResult:
Expand All @@ -218,6 +204,58 @@ def skip(self, reason: str = None) -> StepResult:
"""
return StepResult(self, StepStatus.SKIPPED, stdout=reason)

async def write_log_files(self, stdout: Optional[str] = None, stderr: Optional[str] = None) -> List[Path]:
"""Write stdout and stderr logs to a file in the connector code directory.

Args:
stdout (Optional[str], optional): The step final container stdout. Defaults to None.
stderr (Optional[str], optional): The step final container stderr. Defaults to None.

Returns:
List[Path]: The list of written log files.
"""
if not stdout and not stderr:
return []

written_log_files = []
log_directory = Path(f"{self.context.connector.code_directory}/airbyte_ci_logs/{slugify(self.context.pipeline_name)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use connector here if this is our generic Step class?

await log_directory.mkdir(exist_ok=True, parents=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing all this manual stderr/out handling and log writing makes me nervous.

Particularly now that they write to not just our ci folder but also the connector folder

My worries generally come down to If we keep writing log handling at a low level

  1. We will have to keep repeating our logging code
  2. More importantly we keep repeating if statments like if self.context.is_local and self.should_persist_stdout_stderr_logs: which if missed lead to tricky bugs
  3. And if we go this route for too long, it will be very hard to go back as there will be alot of code to refactor.

This leads me to ask What is stopping us from moving all our log writing/handling to the dagger client boundary here:
https://github.com/airbytehq/airbyte/blob/master/airbyte-ci/connectors/pipelines/pipelines/pipelines/connectors.py#L85

For example a possible solution could be

  1. Write our own logger that extends the typical python logger by adding pipeline specific context
    1. pipeline name
    2. pipeline step
    3. etc.
  2. Write our own subclass of TextIOWrapper that based on the new pipeline context we stuff into the logger
    1. writes a log file per pipeline
    2. writes a log file per step
    3. writes a root log file representing everything
    4. Decides if we are also writing to stdout/stderr
    5. Decides if we need to send these logs anywhere else (e.g. sentry, datadog, cloudwatch etc)

If this is possible and is a good idea.

can we make a small step towards this now by

  1. Moving the decision of whether or not we write to a log file the decision of a custom logger
  2. Moving the writing logic to the logger

e.g. https://stackoverflow.com/questions/6386698/how-to-write-to-a-file-using-the-logging-python-module

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bnchrch let's keep the logging change for a different PR then.
But just to react on top of your comment:

My worries generally come down to If we keep writing log handling at a low level

This is not low level IMO because it's a change made at the Step class level, so we centralize all the logging logic on the base class.

Write our own logger that extends the typical python logger by adding pipeline specific context

This is what this change did: the step have a specific logger with pipeline and step name.

Write our own subclass of TextIOWrapper that based on the new pipeline context we stuff into the logger

In this context you mean parsing the dagger client logs. Splitting them into different per step/pipeline logs file will be very brittle as we have no control on the shape of the dagger logs, I'd rather ask the dagger team to provide such a feature and IMO it directly overlaps with the log access in the Dagger Web UI.
Moreover, if steps are cached their stdout are not shown in the logs (and replaced with CACHED).

can we make a small step towards this now by
Moving the decision of whether or not we write to a log file the decision of a custom logger
Moving the writing logic to the logger.

Yep you're right that using a combo of logging to stdout + write logic can definitely be handled at the logger level. Let's do it later and groom #28423 a bit more.
I think we should be clearer in differentiating what we mean by Dagger logs, steps logs, pipeline logs stp.

if stdout:
# TODO alafanechere we could also log the stdout and stderr of the container in the pipeline context.
# It could be a nice alternative to the --show-dagger-logs flag.
stdout_log_path = await (log_directory / f"{slugify(self.title).replace('-', '_')}_stdout.log").resolve()
await stdout_log_path.write_text(stdout)
self.logger.info(f"stdout logs written to {stdout_log_path}")
written_log_files.append(stdout_log_path)
if stderr:
stderr_log_path = await (log_directory / f"{slugify(self.title).replace('-', '_')}_stderr.log").resolve()
await stderr_log_path.write_text(stderr)
self.logger.info(f"stderr logs written to {stderr_log_path}")
written_log_files.append(stderr_log_path)
return written_log_files

def get_step_status_from_exit_code(
self,
exit_code: int,
) -> StepStatus:
"""Map an exit code to a step status.

Args:
exit_code (int): A process exit code.

Raises:
ValueError: Raised if the exit code is not mapped to a step status.

Returns:
StepStatus: The step status inferred from the exit code.
"""
if exit_code == self.success_exit_code:
return StepStatus.SUCCESS
elif self.skipped_exit_code is not None and exit_code == self.skipped_exit_code:
return StepStatus.SKIPPED
else:
return StepStatus.FAILURE

async def get_step_result(self, container: Container) -> StepResult:
"""Concurrent retrieval of exit code, stdout and stdout of a container.

Expand All @@ -230,9 +268,11 @@ async def get_step_result(self, container: Container) -> StepResult:
StepResult: Failure or success with stdout and stderr.
"""
exit_code, stdout, stderr = await get_exec_result(container)
if self.context.is_local and self.should_persist_stdout_stderr_logs:
await self.write_log_files(stdout, stderr)
return StepResult(
self,
StepStatus.from_exit_code(exit_code),
self.get_step_status_from_exit_code(exit_code),
stderr=stderr,
stdout=stdout,
output_artifact=container,
Expand All @@ -249,31 +289,7 @@ def _get_timed_out_step_result(self) -> StepResult:
class PytestStep(Step, ABC):
"""An abstract class to run pytest tests and evaluate success or failure according to pytest logs."""

async def write_log_file(self, logs) -> str:
"""Return the path to the pytest log file."""
log_directory = Path(f"{self.context.connector.code_directory}/airbyte_ci_logs")
await log_directory.mkdir(exist_ok=True)
log_path = await (log_directory / f"{slugify(self.title).replace('-', '_')}.log").resolve()
await log_path.write_text(logs)
self.logger.info(f"Pytest logs written to {log_path}")

# TODO this is not very robust if pytest crashes and does not outputs its expected last log line.
def pytest_logs_to_step_result(self, logs: str) -> StepResult:
"""Parse pytest log and infer failure, success or skipping.

Args:
logs (str): The pytest logs.

Returns:
StepResult: The inferred step result according to the log.
"""
last_log_line = logs.split("\n")[-2]
if "failed" in last_log_line or "errors" in last_log_line:
return StepResult(self, StepStatus.FAILURE, stderr=logs)
elif "no tests ran" in last_log_line:
return StepResult(self, StepStatus.SKIPPED, stdout=logs)
else:
return StepResult(self, StepStatus.SUCCESS, stdout=logs)
skipped_exit_code = 5

async def _run_tests_in_directory(self, connector_under_test: Container, test_directory: str) -> StepResult:
"""Run the pytest tests in the test_directory that was passed.
Expand All @@ -294,18 +310,13 @@ async def _run_tests_in_directory(self, connector_under_test: Container, test_di
"python",
"-m",
"pytest",
"--suppress-tests-failed-exit-code",
"--suppress-no-test-exit-code",
"-s",
test_directory,
"-c",
test_config,
]
)
logs = await tester.stdout()
if self.context.is_local:
await self.write_log_file(logs)
return self.pytest_logs_to_step_result(logs)
return await self.get_step_result(tester)

else:
return StepResult(self, StepStatus.SKIPPED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


from pipelines.actions import environments
from pipelines.bases import StepResult, StepStatus
from pipelines.bases import StepResult
from pipelines.gradle import GradleTask
from pipelines.utils import get_exec_result

Expand All @@ -25,7 +25,7 @@ async def _run(self) -> StepResult:
exit_code, stdout, stderr = await get_exec_result(formatted)
return StepResult(
self,
StepStatus.from_exit_code(exit_code),
self.get_step_status_from_exit_code(exit_code),
stderr=stderr,
stdout=stdout,
output_artifact=formatted.directory(str(self.context.connector.code_directory)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import asyncer
from pipelines.actions import environments
from pipelines.bases import Step, StepResult, StepStatus
from pipelines.bases import Step, StepResult
from pipelines.utils import with_exit_code, with_stderr, with_stdout


Expand Down Expand Up @@ -50,7 +50,7 @@ async def _run(self) -> StepResult:

return StepResult(
self,
StepStatus.from_exit_code(soon_exit_code.value),
self.get_step_status_from_exit_code(await soon_exit_code),
stderr=soon_stderr.value,
stdout=soon_stdout.value,
output_artifact=formatted.directory("/connector_code"),
Expand Down
29 changes: 27 additions & 2 deletions airbyte-ci/connectors/pipelines/pipelines/hacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@
from __future__ import annotations

from copy import deepcopy
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Callable, List

import requests
import yaml
from connector_ops.utils import ConnectorLanguage
from dagger import DaggerError

if TYPE_CHECKING:
from dagger import Client, Container, Directory
from pipelines.contexts import ConnectorContext
from dagger import Client, Directory


LINES_TO_REMOVE_FROM_GRADLE_FILE = [
Expand Down Expand Up @@ -140,3 +140,28 @@ async def cache_latest_cdk(dagger_client: Client, pip_cache_volume_name: str = "
.with_exec(["pip", "install", "--force-reinstall", f"airbyte-cdk=={cdk_latest_version}"])
.sync()
)


def never_fail_exec(command: List[str]) -> Callable:
"""
Wrap a command execution with some bash sugar to always exit with a 0 exit code but write the actual exit code to a file.

Underlying issue:
When a classic dagger with_exec is returning a >0 exit code an ExecError is raised.
It's OK for the majority of our container interaction.
But some execution, like running CAT, are expected to often fail.
In CAT we don't want ExecError to be raised on container interaction because CAT might write updated secrets that we need to pull from the container after the test run.
The bash trick below is a hack to always return a 0 exit code but write the actual exit code to a file.
The file is then read by the pipeline to determine the exit code of the container.

Args:
command (List[str]): The command to run in the container.

Returns:
Callable: _description_
"""

def never_fail_exec_inner(container: Container):
return container.with_exec(["sh", "-c", f"{' '.join(command)}; echo $? > /exit_code"])

return never_fail_exec_inner
Loading