diff --git a/airbyte-ci/connectors/pipelines/README.md b/airbyte-ci/connectors/pipelines/README.md index 4f26d9ad07fe..552e103f2bdb 100644 --- a/airbyte-ci/connectors/pipelines/README.md +++ b/airbyte-ci/connectors/pipelines/README.md @@ -264,11 +264,24 @@ flowchart TD | `--fail-fast` | False | False | Abort after any tests fail, rather than continuing to run additional tests. Use this setting to confirm a known bug is fixed (or not), or when you only require a pass/fail result. | | `--code-tests-only` | True | False | Skip any tests not directly related to code updates. For instance, metadata checks, version bump checks, changelog verification, etc. Use this setting to help focus on code quality during development. | | `--concurrent-cat` | False | False | Make CAT tests run concurrently using pytest-xdist. Be careful about source or destination API rate limits. | +| `--.=` | True | | You can pass extra parameters for specific test steps. More details in the extra parameters section below | Note: - The above options are implemented for Java connectors but may not be available for Python connectors. If an option is not supported, the pipeline will not fail but instead the 'default' behavior will be executed. +#### Extra parameters +You can pass extra parameters to the following steps: +* `unit` +* `integration` +* `acceptance` + +This allows you to override the default parameters of these steps. +For example, you can only run the `test_read` test of the acceptance test suite with: +`airbyte-ci connectors --name=source-pokeapi test --acceptance.-k=test_read` +Here the `-k` parameter is passed to the pytest command running acceptance tests. +Please keep in mind that the extra parameters are not validated by the CLI: if you pass an invalid parameter, you'll face a late failure during the pipeline execution. + ### `connectors build` command Run a build pipeline for one or multiple connectors and export the built docker image to the local docker host. @@ -521,6 +534,7 @@ E.G.: running `pytest` on a specific test folder: | Version | PR | Description | | ------- | ---------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------- | +| 3.2.0 | [#34050](https://github.com/airbytehq/airbyte/pull/34050) | Connector test steps can take extra parameters | | 3.1.3 | [#34136](https://github.com/airbytehq/airbyte/pull/34136) | Fix issue where dagger excludes were not being properly applied | | 3.1.2 | [#33972](https://github.com/airbytehq/airbyte/pull/33972) | Remove secrets scrubbing hack for --is-local and other small tweaks. | | 3.1.1 | [#33979](https://github.com/airbytehq/airbyte/pull/33979) | Fix AssertionError on report existence again | diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py index 9120fc07e0cf..dff4f9b2a736 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/context.py @@ -19,8 +19,8 @@ from pipelines.consts import BUILD_PLATFORMS from pipelines.dagger.actions import secrets from pipelines.helpers.connectors.modifed import ConnectorWithModifiedFiles +from pipelines.helpers.execution.run_steps import RunStepOptions from pipelines.helpers.github import update_commit_status_check -from pipelines.helpers.run_steps import RunStepOptions from pipelines.helpers.slack import send_message_to_webhook from pipelines.helpers.utils import METADATA_FILE_NAME from pipelines.models.contexts.pipeline_context import PipelineContext diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py index 00bdffeabccf..07f48bc5ca75 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/commands.py @@ -3,7 +3,7 @@ # import sys -from typing import List +from typing import Dict, List import asyncclick as click from pipelines import main_logger @@ -13,12 +13,20 @@ from pipelines.airbyte_ci.connectors.test.pipeline import run_connector_test_pipeline from pipelines.cli.dagger_pipeline_command import DaggerPipelineCommand from pipelines.consts import LOCAL_BUILD_PLATFORM, ContextState +from pipelines.helpers.execution import argument_parsing +from pipelines.helpers.execution.run_steps import RunStepOptions from pipelines.helpers.github import update_global_commit_status_check_for_tests -from pipelines.helpers.run_steps import RunStepOptions from pipelines.helpers.utils import fail_if_missing_docker_hub_creds +from pipelines.models.steps import STEP_PARAMS -@click.command(cls=DaggerPipelineCommand, help="Test all the selected connectors.") +@click.command( + cls=DaggerPipelineCommand, + help="Test all the selected connectors.", + context_settings=dict( + ignore_unknown_options=True, + ), +) @click.option( "--code-tests-only", is_flag=True, @@ -47,6 +55,9 @@ type=click.Choice([step_id.value for step_id in CONNECTOR_TEST_STEP_ID]), help="Skip a step by name. Can be used multiple times to skip multiple steps.", ) +@click.argument( + "extra_params", nargs=-1, type=click.UNPROCESSED, callback=argument_parsing.build_extra_params_mapping(CONNECTOR_TEST_STEP_ID) +) @click.pass_context async def test( ctx: click.Context, @@ -54,6 +65,7 @@ async def test( fail_fast: bool, concurrent_cat: bool, skip_step: List[str], + extra_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS], ) -> bool: """Runs a test pipeline for the selected connectors. @@ -76,8 +88,8 @@ async def test( run_step_options = RunStepOptions( fail_fast=fail_fast, skip_steps=[CONNECTOR_TEST_STEP_ID(step_id) for step_id in skip_step], + step_params=extra_params, ) - connectors_tests_contexts = [ ConnectorContext( pipeline_name=f"Testing connector {connector.technical_name}", diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py index 82b9b0099efc..d1b875a1c180 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/pipeline.py @@ -3,6 +3,9 @@ # """This module groups factory like functions to dispatch tests steps according to the connector under test language.""" +from __future__ import annotations + +from typing import TYPE_CHECKING import anyio from connector_ops.utils import ConnectorLanguage # type: ignore @@ -12,7 +15,11 @@ from pipelines.airbyte_ci.connectors.test.steps import java_connectors, python_connectors from pipelines.airbyte_ci.connectors.test.steps.common import QaChecks, VersionFollowsSemverCheck, VersionIncrementCheck from pipelines.airbyte_ci.metadata.pipeline import MetadataValidation -from pipelines.helpers.run_steps import STEP_TREE, StepToRun, run_steps +from pipelines.helpers.execution.run_steps import StepToRun, run_steps + +if TYPE_CHECKING: + + from pipelines.helpers.execution.run_steps import STEP_TREE LANGUAGE_MAPPING = { "get_test_steps": { @@ -30,7 +37,7 @@ def get_test_steps(context: ConnectorContext) -> STEP_TREE: context (ConnectorContext): The current connector context. Returns: - List[StepResult]: The list of tests steps. + STEP_TREE: The list of tests steps. """ if _get_test_steps := LANGUAGE_MAPPING["get_test_steps"].get(context.connector.language): return _get_test_steps(context) @@ -43,11 +50,12 @@ async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyi """ Compute the steps to run for a connector test pipeline. """ + all_steps_to_run: STEP_TREE = [] - steps_to_run = get_test_steps(context) + all_steps_to_run += get_test_steps(context) if not context.code_tests_only: - steps_to_run += [ + static_analysis_steps_to_run = [ [ StepToRun(id=CONNECTOR_TEST_STEP_ID.METADATA_VALIDATION, step=MetadataValidation(context)), StepToRun(id=CONNECTOR_TEST_STEP_ID.VERSION_FOLLOW_CHECK, step=VersionFollowsSemverCheck(context)), @@ -55,11 +63,12 @@ async def run_connector_test_pipeline(context: ConnectorContext, semaphore: anyi StepToRun(id=CONNECTOR_TEST_STEP_ID.QA_CHECKS, step=QaChecks(context)), ] ] + all_steps_to_run += static_analysis_steps_to_run async with semaphore: async with context: result_dict = await run_steps( - runnables=steps_to_run, + runnables=all_steps_to_run, options=context.run_step_options, ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py index f7139fe885bd..dc780ac50f1a 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/common.py @@ -21,7 +21,7 @@ from pipelines.dagger.actions import secrets from pipelines.dagger.containers import internal_tools from pipelines.helpers.utils import METADATA_FILE_NAME -from pipelines.models.steps import Step, StepResult, StepStatus +from pipelines.models.steps import STEP_PARAMS, Step, StepResult, StepStatus class VersionCheck(Step, ABC): @@ -193,6 +193,20 @@ class AcceptanceTests(Step): CONTAINER_TEST_INPUT_DIRECTORY = "/test_input" CONTAINER_SECRETS_DIRECTORY = "/test_input/secrets" skipped_exit_code = 5 + accept_extra_params = True + + @property + def default_params(self) -> STEP_PARAMS: + """Default pytest options. + + Returns: + dict: The default pytest options. + """ + return super().default_params | { + "-ra": [], # Show extra test summary info in the report for all but the passed tests + "--disable-warnings": [], # Disable warnings in the pytest report + "--durations": ["3"], # Show the 3 slowest tests in the report + } @property def base_cat_command(self) -> List[str]: @@ -200,14 +214,12 @@ def base_cat_command(self) -> List[str]: "python", "-m", "pytest", - "--disable-warnings", - "--durations=3", # Show the 3 slowest tests in the report - "-ra", # Show extra test summary info in the report for all but the passed tests "-p", # Load the connector_acceptance_test plugin "connector_acceptance_test.plugin", "--acceptance-test-config", self.CONTAINER_TEST_INPUT_DIRECTORY, ] + if self.concurrent_test_run: command += ["--numprocesses=auto"] # Using pytest-xdist to run tests in parallel, auto means using all available cores return command @@ -232,7 +244,7 @@ async def get_cat_command(self, connector_dir: Directory) -> List[str]: if "integration_tests" in await connector_dir.entries(): if "acceptance.py" in await connector_dir.directory("integration_tests").entries(): cat_command += ["-p", "integration_tests.acceptance"] - return cat_command + return cat_command + self.params_as_cli_options async def _run(self, connector_under_test_container: Container) -> StepResult: """Run the acceptance test suite on a connector dev image. Build the connector acceptance test image if the tag is :dev. diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py index 5ca660a1112d..06b0aaea4314 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/java_connectors.py @@ -21,24 +21,30 @@ from pipelines.airbyte_ci.steps.gradle import GradleTask from pipelines.consts import LOCAL_BUILD_PLATFORM from pipelines.dagger.actions.system import docker -from pipelines.helpers.run_steps import StepToRun +from pipelines.helpers.execution.run_steps import StepToRun from pipelines.helpers.utils import export_container_to_tarball -from pipelines.models.steps import StepResult, StepStatus +from pipelines.models.steps import STEP_PARAMS, StepResult, StepStatus if TYPE_CHECKING: from typing import Callable, Dict, List, Optional - from pipelines.helpers.run_steps import RESULTS_DICT, STEP_TREE + from pipelines.helpers.execution.run_steps import RESULTS_DICT, STEP_TREE class IntegrationTests(GradleTask): """A step to run integrations tests for Java connectors using the integrationTestJava Gradle task.""" title = "Java Connector Integration Tests" - gradle_task_name = "integrationTestJava -x buildConnectorImage -x assemble" + gradle_task_name = "integrationTestJava" mount_connector_secrets = True bind_to_docker_host = True + @property + def default_params(self) -> STEP_PARAMS: + return super().default_params | { + "-x": ["buildConnectorImage", "assemble"], # Exclude the buildConnectorImage and assemble tasks + } + async def _load_normalization_image(self, normalization_tar_file: File) -> None: normalization_image_tag = f"{self.context.connector.normalization_repository}:dev" self.context.logger.info("Load the normalization image to the docker host.") diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py index 65117b6d804b..5b2d71c465c6 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py @@ -16,8 +16,8 @@ from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, CheckBaseImageIsUsed from pipelines.consts import LOCAL_BUILD_PLATFORM from pipelines.dagger.actions import secrets -from pipelines.helpers.run_steps import STEP_TREE, StepToRun -from pipelines.models.steps import Step, StepResult +from pipelines.helpers.execution.run_steps import STEP_TREE, StepToRun +from pipelines.models.steps import STEP_PARAMS, Step, StepResult class PytestStep(Step, ABC): @@ -31,6 +31,18 @@ class PytestStep(Step, ABC): skipped_exit_code = 5 bind_to_docker_host = False + accept_extra_params = True + + @property + def default_params(self) -> STEP_PARAMS: + """Default pytest options. + + Returns: + dict: The default pytest options. + """ + return super().default_params | { + "-s": [], # Disable capturing stdout/stderr in pytest + } @property @abstractmethod @@ -43,15 +55,6 @@ def extra_dependencies_names(self) -> Sequence[str]: return ("dev",) return ("dev", "tests") - @property - def additional_pytest_options(self) -> List[str]: - """Theses options are added to the pytest command. - - Returns: - List[str]: The additional pytest options. - """ - return [] - async def _run(self, connector_under_test: Container) -> StepResult: """Run all pytest tests declared in the test directory of the connector code. @@ -83,7 +86,7 @@ def get_pytest_command(self, test_config_file_name: str) -> List[str]: Returns: List[str]: The pytest command to run. """ - cmd = ["pytest", "-s", self.test_directory_name, "-c", test_config_file_name] + self.additional_pytest_options + cmd = ["pytest", self.test_directory_name, "-c", test_config_file_name] + self.params_as_cli_options if self.context.connector.is_using_poetry: return ["poetry", "run"] + cmd return cmd @@ -174,18 +177,16 @@ class UnitTests(PytestStep): MINIMUM_COVERAGE_FOR_CERTIFIED_CONNECTORS = 90 @property - def additional_pytest_options(self) -> List[str]: + def default_params(self) -> STEP_PARAMS: """Make sure the coverage computation is run for the unit tests. - Fail if the coverage is under 90% for certified connectors. Returns: - List[str]: The additional pytest options to run coverage reports. + dict: The default pytest options. """ - coverage_options = ["--cov", self.context.connector.technical_name.replace("-", "_")] + coverage_options = {"--cov": [self.context.connector.technical_name.replace("-", "_")]} if self.context.connector.support_level == "certified": - coverage_options += ["--cov-fail-under", str(self.MINIMUM_COVERAGE_FOR_CERTIFIED_CONNECTORS)] - - return super().additional_pytest_options + coverage_options + coverage_options["--cov-fail-under"] = [str(self.MINIMUM_COVERAGE_FOR_CERTIFIED_CONNECTORS)] + return super().default_params | coverage_options class IntegrationTests(PytestStep): diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py index eae516a8db79..4860decdaf73 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/metadata/pipeline.py @@ -13,7 +13,7 @@ from pipelines.consts import DOCS_DIRECTORY_ROOT_PATH, INTERNAL_TOOL_PATHS from pipelines.dagger.actions.python.common import with_pip_packages from pipelines.dagger.containers.python import with_python_base -from pipelines.helpers.run_steps import STEP_TREE, StepToRun, run_steps +from pipelines.helpers.execution.run_steps import STEP_TREE, StepToRun, run_steps from pipelines.helpers.utils import DAGGER_CONFIG, get_secret_host_variable from pipelines.models.reports import Report from pipelines.models.steps import MountPath, Step, StepResult diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/gradle.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/gradle.py index 94e0cb8ff769..ae44de953449 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/gradle.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/steps/gradle.py @@ -11,7 +11,7 @@ from pipelines.consts import AMAZONCORRETTO_IMAGE from pipelines.dagger.actions import secrets from pipelines.helpers.utils import sh_dash_c -from pipelines.models.steps import Step, StepResult +from pipelines.models.steps import STEP_PARAMS, Step, StepResult class GradleTask(Step, ABC): @@ -27,14 +27,23 @@ class GradleTask(Step, ABC): context: ConnectorContext - DEFAULT_GRADLE_TASK_OPTIONS = ("--no-daemon", "--no-watch-fs", "--scan", "--build-cache", "--console=plain") LOCAL_MAVEN_REPOSITORY_PATH = "/root/.m2" GRADLE_DEP_CACHE_PATH = "/root/gradle-cache" GRADLE_HOME_PATH = "/root/.gradle" - + STATIC_GRADLE_TASK_OPTIONS = ("--no-daemon", "--no-watch-fs") gradle_task_name: ClassVar[str] bind_to_docker_host: ClassVar[bool] = False mount_connector_secrets: ClassVar[bool] = False + accept_extra_params = True + + @property + def default_params(self) -> STEP_PARAMS: + return super().default_params | { + "-Ds3BuildCachePrefix": [self.context.connector.technical_name], # Set the S3 build cache prefix. + "--build-cache": [], # Enable the gradle build cache. + "--scan": [], # Enable the gradle build scan. + "--console": ["plain"], # Disable the gradle rich console. + } @property def dependency_cache_volume(self) -> CacheVolume: @@ -56,7 +65,7 @@ def build_include(self) -> List[str]: ] def _get_gradle_command(self, task: str, *args: Any) -> str: - return f"./gradlew {' '.join(self.DEFAULT_GRADLE_TASK_OPTIONS + args)} {task}" + return f"./gradlew {' '.join(self.STATIC_GRADLE_TASK_OPTIONS + args)} {task}" async def _run(self, *args: Any, **kwargs: Any) -> StepResult: include = [ @@ -191,7 +200,7 @@ async def _run(self, *args: Any, **kwargs: Any) -> StepResult: # Warm the gradle cache. f"(rsync -a --stats --mkpath {self.GRADLE_DEP_CACHE_PATH}/ {self.GRADLE_HOME_PATH} || true)", # Run the gradle task. - self._get_gradle_command(connector_task, f"-Ds3BuildCachePrefix={self.context.connector.technical_name}"), + self._get_gradle_command(connector_task, *self.params_as_cli_options), ] ) ) diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/__init__.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/argument_parsing.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/argument_parsing.py new file mode 100644 index 000000000000..af32aa52b213 --- /dev/null +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/argument_parsing.py @@ -0,0 +1,66 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +from __future__ import annotations + +import re +from typing import TYPE_CHECKING + +import asyncclick as click + +if TYPE_CHECKING: + from enum import Enum + from typing import Callable, Dict, Tuple, Type + + from pipelines.models.steps import STEP_PARAMS + +# Pattern for extra param options: --.= +EXTRA_PARAM_PATTERN_FOR_OPTION = re.compile(r"^--([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_-][a-zA-Z0-9_-]*)=([^=]+)$") +# Pattern for extra param flag: --. +EXTRA_PARAM_PATTERN_FOR_FLAG = re.compile(r"^--([a-zA-Z_][a-zA-Z0-9_]*)\.([a-zA-Z_-][a-zA-Z0-9_-]*)$") +EXTRA_PARAM_PATTERN_ERROR_MESSAGE = "The extra flags must be structured as --. for flags or --.= for options. You can use - or -- for option/flag names." + + +def build_extra_params_mapping(SupportedStepIds: Type[Enum]) -> Callable: + def callback(ctx: click.Context, argument: click.core.Argument, raw_extra_params: Tuple[str]) -> Dict[str, STEP_PARAMS]: + """Build a mapping of step id to extra params. + Validate the extra params and raise a ValueError if they are invalid. + Validation rules: + - The extra params must be structured as --.= for options or --. for flags. + - The step id must be one of the existing step ids. + + + Args: + ctx (click.Context): The click context. + argument (click.core.Argument): The click argument. + raw_extra_params (Tuple[str]): The extra params provided by the user. + Raises: + ValueError: Raised if the extra params format is invalid. + ValueError: Raised if the step id in the extra params is not one of the unique steps to run. + + Returns: + Dict[Literal, STEP_PARAMS]: The mapping of step id to extra params. + """ + extra_params_mapping: Dict[str, STEP_PARAMS] = {} + for param in raw_extra_params: + is_flag = "=" not in param + pattern = EXTRA_PARAM_PATTERN_FOR_FLAG if is_flag else EXTRA_PARAM_PATTERN_FOR_OPTION + matches = pattern.match(param) + if not matches: + raise ValueError(f"Invalid parameter {param}. {EXTRA_PARAM_PATTERN_ERROR_MESSAGE}") + if is_flag: + step_name, param_name = matches.groups() + param_value = None + else: + step_name, param_name, param_value = matches.groups() + try: + step_id = SupportedStepIds(step_name).value + except ValueError: + raise ValueError(f"Invalid step name {step_name}, it must be one of {[step_id.value for step_id in SupportedStepIds]}") + + extra_params_mapping.setdefault(step_id, {}).setdefault(param_name, []) + # param_value is None if the param is a flag + if param_value is not None: + extra_params_mapping[step_id][param_name].append(param_value) + return extra_params_mapping + + return callback diff --git a/airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py b/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py similarity index 95% rename from airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py rename to airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py index 7a66acad61d7..8fb320d9fd5e 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/helpers/run_steps.py +++ b/airbyte-ci/connectors/pipelines/pipelines/helpers/execution/run_steps.py @@ -16,7 +16,8 @@ from pipelines.models.steps import StepStatus if TYPE_CHECKING: - from pipelines.models.steps import Step, StepResult + from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID + from pipelines.models.steps import STEP_PARAMS, Step, StepResult RESULTS_DICT = Dict[str, StepResult] ARGS_TYPE = Union[Dict, Callable[[RESULTS_DICT], Dict], Awaitable[Dict]] @@ -34,6 +35,7 @@ class RunStepOptions: skip_steps: List[str] = field(default_factory=list) log_step_tree: bool = True concurrency: int = 10 + step_params: Dict[CONNECTOR_TEST_STEP_ID, STEP_PARAMS] = field(default_factory=dict) @dataclass(frozen=True) @@ -44,7 +46,7 @@ class StepToRun: Used to coordinate the execution of multiple steps inside a pipeline. """ - id: str + id: CONNECTOR_TEST_STEP_ID step: Step args: ARGS_TYPE = field(default_factory=dict) depends_on: List[str] = field(default_factory=list) @@ -71,7 +73,7 @@ def _skip_remaining_steps(remaining_steps: STEP_TREE) -> RESULTS_DICT: """ Skip all remaining steps. """ - skipped_results = {} + skipped_results: Dict[str, StepResult] = {} for runnable_step in remaining_steps: if isinstance(runnable_step, StepToRun): skipped_results[runnable_step.id] = runnable_step.step.skip() @@ -243,6 +245,7 @@ async def run_steps( tasks.append(task_group.soonify(run_steps)(list(step_to_run), results, options)) else: step_args = await evaluate_run_args(step_to_run.args, results) + step_to_run.step.extra_params = options.step_params.get(step_to_run.id, {}) main_logger.info(f"QUEUING STEP {step_to_run.id}") tasks.append(task_group.soonify(step_to_run.step.run)(**step_args)) diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py index 83b1cb8e4b6d..0d2431560145 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/contexts/pipeline_context.py @@ -18,9 +18,9 @@ from github import PullRequest from pipelines.airbyte_ci.connectors.reports import ConnectorReport from pipelines.consts import CIContext, ContextState +from pipelines.helpers.execution.run_steps import RunStepOptions from pipelines.helpers.gcs import sanitize_gcs_credentials from pipelines.helpers.github import update_commit_status_check -from pipelines.helpers.run_steps import RunStepOptions from pipelines.helpers.slack import send_message_to_webhook from pipelines.helpers.utils import AIRBYTE_REPO_URL from pipelines.models.reports import Report diff --git a/airbyte-ci/connectors/pipelines/pipelines/models/steps.py b/airbyte-ci/connectors/pipelines/pipelines/models/steps.py index 7a285c8ec4a8..bc3acafebc06 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/models/steps.py +++ b/airbyte-ci/connectors/pipelines/pipelines/models/steps.py @@ -10,7 +10,7 @@ from datetime import datetime, timedelta from enum import Enum from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Dict, List import anyio import asyncer @@ -25,10 +25,13 @@ from pipelines.airbyte_ci.format.format_command import FormatCommand from pipelines.models.contexts.pipeline_context import PipelineContext + from abc import ABC from rich.style import Style +STEP_PARAMS = Dict[str, List[str]] + @dataclass class MountPath: @@ -155,14 +158,50 @@ class Step(ABC): # The max duration of a step run. If the step run for more than this duration it will be considered as timed out. # The default of 5 hours is arbitrary and can be changed if needed. max_duration: ClassVar[timedelta] = timedelta(hours=5) - retry_delay = timedelta(seconds=10) + accept_extra_params: bool = False def __init__(self, context: PipelineContext) -> None: # noqa D107 self.context = context self.retry_count = 0 self.started_at: Optional[datetime] = None self.stopped_at: Optional[datetime] = None + self._extra_params: STEP_PARAMS = {} + + @property + def extra_params(self) -> STEP_PARAMS: + return self._extra_params + + @extra_params.setter + def extra_params(self, value: STEP_PARAMS) -> None: + if value and not self.accept_extra_params: + raise ValueError(f"{self.__class__.__name__} does not accept extra params.") + self._extra_params = value + self.logger.info(f"Will run with the following parameters: {self.params}") + + @property + def default_params(self) -> STEP_PARAMS: + return {} + + @property + def params(self) -> STEP_PARAMS: + return self.default_params | self.extra_params + + @property + def params_as_cli_options(self) -> List[str]: + """Return the step params as a list of CLI options. + + Returns: + List[str]: The step params as a list of CLI options. + """ + cli_options: List[str] = [] + for name, values in self.params.items(): + if not values: + # If no values are available, we assume it is a flag + cli_options.append(name) + else: + cli_options.extend(f"{name}={value}" for value in values) + return cli_options @property def title(self) -> str: diff --git a/airbyte-ci/connectors/pipelines/pyproject.toml b/airbyte-ci/connectors/pipelines/pyproject.toml index 256d1075bad8..ef811f62aaca 100644 --- a/airbyte-ci/connectors/pipelines/pyproject.toml +++ b/airbyte-ci/connectors/pipelines/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipelines" -version = "3.1.3" +version = "3.2.0" description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines" authors = ["Airbyte "] diff --git a/airbyte-ci/connectors/pipelines/tests/test_gradle.py b/airbyte-ci/connectors/pipelines/tests/test_gradle.py index 1435d0d5ffcf..5e867c3582ba 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_gradle.py +++ b/airbyte-ci/connectors/pipelines/tests/test_gradle.py @@ -18,6 +18,7 @@ class TestGradleTask: class DummyStep(gradle.GradleTask): gradle_task_name = "dummyTask" + title = "Dummy Step" async def _run(self) -> steps.StepResult: return steps.StepResult(self, steps.StepStatus.SUCCESS) @@ -35,3 +36,21 @@ def test_context(self, mocker, dagger_client): async def test_build_include(self, test_context): step = self.DummyStep(test_context) assert step.build_include + + def test_params(self, test_context): + step = self.DummyStep(test_context) + assert set(step.params_as_cli_options) == { + f"-Ds3BuildCachePrefix={test_context.connector.technical_name}", + "--build-cache", + "--scan", + "--console=plain", + } + step.extra_params = {"-x": ["dummyTask", "dummyTask2"], "--console": ["rich"]} + assert set(step.params_as_cli_options) == { + f"-Ds3BuildCachePrefix={test_context.connector.technical_name}", + "--build-cache", + "--scan", + "--console=rich", + "-x=dummyTask", + "-x=dummyTask2", + } diff --git a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/__init__.py b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_argument_parsing.py b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_argument_parsing.py new file mode 100644 index 000000000000..7201a2b83059 --- /dev/null +++ b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_argument_parsing.py @@ -0,0 +1,36 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +import enum +import time + +import anyio +import pytest +from pipelines.helpers.execution import argument_parsing + + +class SupportedStepIds(enum.Enum): + STEP1 = "step1" + STEP2 = "step2" + STEP3 = "step3" + + +def test_build_extra_params_mapping(mocker): + ctx = mocker.Mock() + argument = mocker.Mock() + + raw_extra_params = ( + "--step1.param1=value1", + "--step2.param2=value2", + "--step3.param3=value3", + "--step1.param4", + ) + + result = argument_parsing.build_extra_params_mapping(SupportedStepIds)(ctx, argument, raw_extra_params) + + expected_result = { + SupportedStepIds.STEP1.value: {"param1": ["value1"], "param4": []}, + SupportedStepIds.STEP2.value: {"param2": ["value2"]}, + SupportedStepIds.STEP3.value: {"param3": ["value3"]}, + } + + assert result == expected_result diff --git a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_run_steps.py b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py similarity index 95% rename from airbyte-ci/connectors/pipelines/tests/test_helpers/test_run_steps.py rename to airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py index e0ac8d3af4b1..cc2f2e4cb7c6 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_helpers/test_run_steps.py +++ b/airbyte-ci/connectors/pipelines/tests/test_helpers/test_execution/test_run_steps.py @@ -4,7 +4,7 @@ import anyio import pytest -from pipelines.helpers.run_steps import InvalidStepConfiguration, RunStepOptions, StepToRun, run_steps +from pipelines.helpers.execution.run_steps import InvalidStepConfiguration, RunStepOptions, StepToRun, run_steps from pipelines.models.contexts.pipeline_context import PipelineContext from pipelines.models.steps import Step, StepResult, StepStatus @@ -346,3 +346,16 @@ async def test_run_steps_throws_on_invalid_args(invalid_args): with pytest.raises(TypeError): await run_steps(steps) + + +@pytest.mark.anyio +async def test_run_steps_with_params(): + steps = [StepToRun(id="step1", step=TestStep(test_context))] + options = RunStepOptions(fail_fast=True, step_params={"step1": {"--param1": ["value1"]}}) + TestStep.accept_extra_params = False + with pytest.raises(ValueError): + await run_steps(steps, options=options) + assert steps[0].step.params_as_cli_options == [] + TestStep.accept_extra_params = True + await run_steps(steps, options=options) + assert steps[0].step.params_as_cli_options == ["--param1=value1"] diff --git a/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py b/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py index 567136eeb264..a9c1470b6dd7 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py +++ b/airbyte-ci/connectors/pipelines/tests/test_tests/test_common.py @@ -241,6 +241,12 @@ async def test_cat_container_caching( fourth_date_result = await cat_container.stdout() assert fourth_date_result != third_date_result + async def test_params(self, dagger_client, mocker, test_context_ci, test_input_dir): + acceptance_test_step = self.get_patched_acceptance_test_step(dagger_client, mocker, test_context_ci, test_input_dir) + assert set(acceptance_test_step.params_as_cli_options) == {"-ra", "--disable-warnings", "--durations=3"} + acceptance_test_step.extra_params = {"--durations": ["5"], "--collect-only": []} + assert set(acceptance_test_step.params_as_cli_options) == {"-ra", "--disable-warnings", "--durations=5", "--collect-only"} + class TestCheckBaseImageIsUsed: @pytest.fixture diff --git a/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py b/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py index 4063c782c05f..2d89af9ec94d 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py +++ b/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py @@ -97,3 +97,11 @@ async def test__run_for_poetry(self, context_for_connector_with_poetry, containe context_for_connector_with_poetry.connector.technical_name in pip_freeze_output ), "The connector should be installed in the test environment." assert "pytest" in pip_freeze_output, "The pytest package should be installed in the test environment." + + def test_params(self, context_for_certified_connector_with_setup): + step = UnitTests(context_for_certified_connector_with_setup) + assert step.params_as_cli_options == [ + "-s", + f"--cov={context_for_certified_connector_with_setup.connector.technical_name.replace('-', '_')}", + f"--cov-fail-under={step.MINIMUM_COVERAGE_FOR_CERTIFIED_CONNECTORS}", + ]