Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

airbyte-ci: Introduce --only-step option for connector tests #34276

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ Test certified connectors:
Test connectors changed on the current branch:
`airbyte-ci connectors --modified test`

Run acceptance test only on the modified connectors, just run its full refresh tests:
`airbyte-ci connectors --modified test --only-step="acceptance" --acceptance.-k=test_full_refresh`

#### What it runs

```mermaid
Expand Down Expand Up @@ -261,11 +264,12 @@ flowchart TD
| Option | Multiple | Default value | Description |
| ------------------------------------------------------- | -------- | ------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--skip-step/-x` | True | | Skip steps by id e.g. `-x unit -x acceptance` |
| `--only-step/-k` | True | | Only run specific steps by id e.g. `-k unit -k acceptance` |
| `--fail-fast` | False | False | Abort after any tests fail, rather than continuing to run additional tests. Use this setting to confirm a known bug is fixed (or not), or when you only require a pass/fail result. |
| `--code-tests-only` | True | False | Skip any tests not directly related to code updates. For instance, metadata checks, version bump checks, changelog verification, etc. Use this setting to help focus on code quality during development. |
| `--concurrent-cat` | False | False | Make CAT tests run concurrently using pytest-xdist. Be careful about source or destination API rate limits. |
| `--<step-id>.<extra-parameter>=<extra-parameter-value>` | True | | You can pass extra parameters for specific test steps. More details in the extra parameters section below |
| `--ci-requirements` | False | | | Output the CI requirements as a JSON payload. It is used to determine the CI runner to use.
| `--ci-requirements` | False | | | Output the CI requirements as a JSON payload. It is used to determine the CI runner to use.

Note:

Expand Down Expand Up @@ -539,6 +543,7 @@ E.G.: running `pytest` on a specific test folder:

| Version | PR | Description |
| ------- | ---------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------- |
| 3.4.0 | [#34276](https://github.com/airbytehq/airbyte/pull/34276) | Introduce `--only-step` option for connector tests. |
| 3.3.0 | [#34218](https://github.com/airbytehq/airbyte/pull/34218) | Introduce `--ci-requirements` option for client defined CI runners. |
| 3.2.0 | [#34050](https://github.com/airbytehq/airbyte/pull/34050) | Connector test steps can take extra parameters |
| 3.1.3 | [#34136](https://github.com/airbytehq/airbyte/pull/34136) | Fix issue where dagger excludes were not being properly applied |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,19 @@
@click.option(
"--skip-step",
"-x",
"skip_steps",
multiple=True,
type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]),
help="Skip a step by name. Can be used multiple times to skip multiple steps.",
)
@click.option(
"--only-step",
"-k",
"only_steps",
multiple=True,
type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]),
help="Only run specific step by name. Can be used multiple times to keep multiple steps.",
)
@click.argument(
"extra_params", nargs=-1, type=click.UNPROCESSED, callback=argument_parsing.build_extra_params_mapping(CONNECTOR_TEST_STEP_ID)
)
Expand All @@ -66,14 +75,17 @@ async def test(
code_tests_only: bool,
fail_fast: bool,
concurrent_cat: bool,
skip_step: List[str],
skip_steps: List[str],
only_steps: List[str],
extra_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS],
) -> bool:
"""Runs a test pipeline for the selected connectors.

Args:
ctx (click.Context): The click context.
"""
if only_steps and skip_steps:
raise click.UsageError("Cannot use both --only-step and --skip-step at the same time.")
if ctx.obj["is_ci"]:
fail_if_missing_docker_hub_creds(ctx)
if ctx.obj["is_ci"] and ctx.obj["pull_request"] and ctx.obj["pull_request"].draft:
Expand All @@ -89,7 +101,8 @@ async def test(

run_step_options = RunStepOptions(
fail_fast=fail_fast,
skip_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in skip_step],
skip_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in skip_steps],
keep_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in only_steps],
step_params=extra_params,
)
connectors_tests_contexts = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import inspect
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple, Union
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple, Union

import anyio
import asyncer
Expand All @@ -27,16 +27,78 @@ class InvalidStepConfiguration(Exception):
pass


def _get_dependency_graph(steps: STEP_TREE) -> Dict[str, List[str]]:
"""
Get the dependency graph of a step tree.
"""
dependency_graph: Dict[str, List[str]] = {}
for step in steps:
if isinstance(step, StepToRun):
dependency_graph[step.id] = step.depends_on
elif isinstance(step, list):
nested_dependency_graph = _get_dependency_graph(list(step))
dependency_graph = {**dependency_graph, **nested_dependency_graph}
else:
raise Exception(f"Unexpected step type: {type(step)}")

return dependency_graph


def _get_transitive_dependencies_for_step_id(
dependency_graph: Dict[str, List[str]], step_id: str, visited: Optional[Set[str]] = None
) -> List[str]:
"""Get the transitive dependencies for a step id.

Args:
dependency_graph (Dict[str, str]): The dependency graph to use.
step_id (str): The step id to get the transitive dependencies for.
visited (Optional[Set[str]], optional): The set of visited step ids. Defaults to None.

Returns:
List[str]: List of transitive dependencies as step ids.
"""
if visited is None:
visited = set()

if step_id not in visited:
visited.add(step_id)

dependencies: List[str] = dependency_graph.get(step_id, [])
for dependency in dependencies:
dependencies.extend(_get_transitive_dependencies_for_step_id(dependency_graph, dependency, visited))

return dependencies
else:
return []


@dataclass
class RunStepOptions:
"""Options for the run_step function."""

fail_fast: bool = True
skip_steps: List[str] = field(default_factory=list)
keep_steps: List[str] = field(default_factory=list)
log_step_tree: bool = True
concurrency: int = 10
step_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS] = field(default_factory=dict)

def __post_init__(self) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

if self.skip_steps and self.keep_steps:
raise ValueError("Cannot use both skip_steps and keep_steps at the same time")

def get_step_ids_to_skip(self, runnables: STEP_TREE) -> List[str]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❗This is all great. Only thing thats missing is some tests to ensure weve got the traversal correct

if self.skip_steps:
return self.skip_steps
if self.keep_steps:
step_ids_to_keep = set(self.keep_steps)
dependency_graph = _get_dependency_graph(runnables)
all_step_ids = set(dependency_graph.keys())
for step_id in self.keep_steps:
step_ids_to_keep.update(_get_transitive_dependencies_for_step_id(dependency_graph, step_id))
return list(all_step_ids - step_ids_to_keep)
return []


@dataclass(frozen=True)
class StepToRun:
Expand Down Expand Up @@ -217,6 +279,7 @@ async def run_steps(
if not runnables:
return results

step_ids_to_skip = options.get_step_ids_to_skip(runnables)
# Log the step tree
if options.log_step_tree:
main_logger.info(f"STEP TREE: {runnables}")
Expand All @@ -232,7 +295,7 @@ async def run_steps(
steps_to_evaluate, remaining_steps = _get_next_step_group(runnables)

# Remove any skipped steps
steps_to_run, results = _filter_skipped_steps(steps_to_evaluate, options.skip_steps, results)
steps_to_run, results = _filter_skipped_steps(steps_to_evaluate, step_ids_to_skip, results)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍


# Run all steps in list concurrently
semaphore = anyio.Semaphore(options.concurrency)
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "3.3.0"
version = "3.4.0"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <contact@airbyte.io>"]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,66 @@ async def test_run_steps_with_params():
TestStep.accept_extra_params = True
await run_steps(steps, options=options)
assert steps[0].step.params_as_cli_options == ["--param1=value1"]


class TestRunStepOptions:
def test_init(self):
options = RunStepOptions()
assert options.fail_fast is True
assert options.concurrency == 10
assert options.skip_steps == []
assert options.step_params == {}

options = RunStepOptions(fail_fast=False, concurrency=1, skip_steps=["step1"], step_params={"step1": {"--param1": ["value1"]}})
assert options.fail_fast is False
assert options.concurrency == 1
assert options.skip_steps == ["step1"]
assert options.step_params == {"step1": {"--param1": ["value1"]}}

with pytest.raises(ValueError):
RunStepOptions(skip_steps=["step1"], keep_steps=["step2"])

@pytest.mark.parametrize(
"step_tree, options, expected_skipped_ids",
[
(
[
[StepToRun(id="step1", step=TestStep(test_context)), StepToRun(id="step2", step=TestStep(test_context))],
StepToRun(id="step3", step=TestStep(test_context)),
StepToRun(id="step4", step=TestStep(test_context), depends_on=["step3", "step1"]),
StepToRun(id="step5", step=TestStep(test_context)),
],
RunStepOptions(keep_steps=["step4"]),
{"step2", "step5"},
),
(
[
[StepToRun(id="step1", step=TestStep(test_context)), StepToRun(id="step2", step=TestStep(test_context))],
StepToRun(id="step3", step=TestStep(test_context)),
[
StepToRun(id="step4", step=TestStep(test_context), depends_on=["step1"]),
StepToRun(id="step6", step=TestStep(test_context), depends_on=["step4", "step5"]),
],
StepToRun(id="step5", step=TestStep(test_context), depends_on=["step3"]),
],
RunStepOptions(keep_steps=["step6"]),
{"step2"},
),
(
[
[StepToRun(id="step1", step=TestStep(test_context)), StepToRun(id="step2", step=TestStep(test_context))],
StepToRun(id="step3", step=TestStep(test_context)),
[
StepToRun(id="step4", step=TestStep(test_context), depends_on=["step1"]),
StepToRun(id="step6", step=TestStep(test_context), depends_on=["step4", "step5"]),
],
StepToRun(id="step5", step=TestStep(test_context), depends_on=["step3"]),
],
RunStepOptions(skip_steps=["step1"]),
{"step1"},
),
],
)
def test_get_step_ids_to_skip(self, step_tree, options, expected_skipped_ids):
skipped_ids = options.get_step_ids_to_skip(step_tree)
assert set(skipped_ids) == expected_skipped_ids
Loading