diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b33423f00f299..4206f65aefe44 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -357,6 +357,13 @@ repos: files: ^Dockerfile$ pass_filenames: false additional_dependencies: ['rich'] + - id: update-supported-versions + name: Updates supported versions in documentation + entry: ./scripts/ci/pre_commit/supported_versions.py + language: python + files: ^scripts/ci/pre_commit/supported_versions.py$|^README.md$|^docs/apache-airflow/supported-versions.rst$ + pass_filenames: false + additional_dependencies: ['tabulate'] - id: update-version name: Update version to the latest version in the documentation entry: ./scripts/ci/pre_commit/pre_commit_update_versions.py diff --git a/BREEZE.rst b/BREEZE.rst index 03095e682adaa..7659134b42853 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -2209,8 +2209,8 @@ This is the current syntax for `./breeze <./breeze>`_: pyupgrade restrict-start_date rst-backticks setup-order setup-extra-packages shellcheck sort-in-the-wild sort-spelling-wordlist stylelint trailing-whitespace ui-lint update-breeze-file update-extras update-local-yml-file update-setup-cfg-file - update-versions vendor-k8s-json-schema verify-db-migrations-documented version-sync - www-lint yamllint yesqa + update-supported-versions update-versions vendor-k8s-json-schema + verify-db-migrations-documented version-sync www-lint yamllint yesqa You can pass extra arguments including options to the pre-commit framework as passed after --. For example: diff --git a/README.md b/README.md index 2379e92af9309..bbc837d06eec7 100644 --- a/README.md +++ b/README.md @@ -261,13 +261,18 @@ packages: Apache Airflow version life cycle: -| Version | Current Patch/Minor | State | First Release | Limited Support | EOL/Terminated | -|---------|---------------------|-----------|---------------|-----------------|----------------| -| 2 | 2.2.3 | Supported | Dec 17, 2020 | TBD | TBD | -| 1.10 | 1.10.15 | EOL | Aug 27, 2018 | Dec 17, 2020 | June 17, 2021 | -| 1.9 | 1.9.0 | EOL | Jan 03, 2018 | Aug 27, 2018 | Aug 27, 2018 | -| 1.8 | 1.8.2 | EOL | Mar 19, 2017 | Jan 03, 2018 | Jan 03, 2018 | -| 1.7 | 1.7.1.2 | EOL | Mar 28, 2016 | Mar 19, 2017 | Mar 19, 2017 | + + + +| Version | Current Patch/Minor | State | First Release | Limited Support | EOL/Terminated | +|-----------|-----------------------|-----------|-----------------|-------------------|------------------| +| 2 | 2.2.3 | Supported | Dec 17, 2020 | TBD | TBD | +| 1.10 | 1.10.15 | EOL | Aug 27, 2018 | Dec 17, 2020 | June 17, 2021 | +| 1.9 | 1.9.0 | EOL | Jan 03, 2018 | Aug 27, 2018 | Aug 27, 2018 | +| 1.8 | 1.8.2 | EOL | Mar 19, 2017 | Jan 03, 2018 | Jan 03, 2018 | +| 1.7 | 1.7.1.2 | EOL | Mar 28, 2016 | Mar 19, 2017 | Mar 19, 2017 | + + Limited support versions will be supported with security and critical bug fix only. EOL versions will not get any fixes nor support. diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst index 54de6178080f1..73eb37395f2cf 100644 --- a/STATIC_CODE_CHECKS.rst +++ b/STATIC_CODE_CHECKS.rst @@ -278,6 +278,8 @@ require Breeze Docker images to be installed locally. ------------------------------------ ---------------------------------------------------------------- ------------ ``update-setup-cfg-file`` Update setup.cfg file with all licenses ------------------------------------ ---------------------------------------------------------------- ------------ +``update-supported-versions`` Updates supported versions in documentation +------------------------------------ ---------------------------------------------------------------- ------------ ``update-versions`` Updates latest versions in the documentation ------------------------------------ ---------------------------------------------------------------- ------------ ``vendor-k8s-json-schema`` Vendor k8s schema definitions in the helm chart schema file diff --git a/airflow/kubernetes/pod_launcher_deprecated.py b/airflow/kubernetes/pod_launcher_deprecated.py index 2c1410e6d42e3..de9c5f52fe564 100644 --- a/airflow/kubernetes/pod_launcher_deprecated.py +++ b/airflow/kubernetes/pod_launcher_deprecated.py @@ -39,7 +39,7 @@ warnings.warn( """ - Please use :mod: Please use `airflow.providers.cncf.kubernetes.utils.pod_launcher` + Please use :mod: Please use `airflow.providers.cncf.kubernetes.utils.pod_manager` To use this module install the provider package by installing this pip package: @@ -62,7 +62,7 @@ class PodStatus: class PodLauncher(LoggingMixin): """Deprecated class for launching pods. please use - airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher instead + airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead """ def __init__( @@ -74,7 +74,7 @@ def __init__( ): """ Deprecated class for launching pods. please use - airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher instead + airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead Creates the launcher. :param kube_client: kubernetes client diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index c904f0e2e9825..447ab94abeb9b 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -621,6 +621,12 @@ class EksPodOperator(KubernetesPodOperator): empty, then the default boto3 configuration would be used (and must be maintained on each worker node). :type aws_conn_id: str + :param is_delete_operator_pod: What to do when the pod reaches its final + state, or the execution is interrupted. If True, delete the + pod; if False, leave the pod. Current default is False, but this will be + changed in the next major release of this provider. + :type is_delete_operator_pod: bool + """ template_fields: Sequence[str] = tuple( @@ -647,6 +653,7 @@ def __init__( pod_username: Optional[str] = None, aws_conn_id: str = DEFAULT_CONN_ID, region: Optional[str] = None, + is_delete_operator_pod: Optional[bool] = None, **kwargs, ) -> None: if pod_name is None: @@ -658,6 +665,17 @@ def __init__( ) pod_name = DEFAULT_POD_NAME + if is_delete_operator_pod is None: + warnings.warn( + f"You have not set parameter `is_delete_operator_pod` in class {self.__class__.__name__}. " + "Currently the default for this parameter is `False` but in a future release the default " + "will be changed to `True`. To ensure pods are not deleted in the future you will need to " + "set `is_delete_operator_pod=False` explicitly.", + DeprecationWarning, + stacklevel=2, + ) + is_delete_operator_pod = False + self.cluster_name = cluster_name self.in_cluster = in_cluster self.namespace = namespace @@ -668,6 +686,7 @@ def __init__( in_cluster=self.in_cluster, namespace=self.namespace, name=self.pod_name, + is_delete_operator_pod=is_delete_operator_pod, **kwargs, ) if pod_username: diff --git a/airflow/providers/amazon/aws/transfers/ftp_to_s3.py b/airflow/providers/amazon/aws/transfers/ftp_to_s3.py index 8bfea305c8076..c52d3e9609910 100644 --- a/airflow/providers/amazon/aws/transfers/ftp_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/ftp_to_s3.py @@ -97,8 +97,8 @@ def __init__( self.encrypt = encrypt self.gzip = gzip self.acl_policy = acl_policy - self.s3_hook = None - self.ftp_hook = None + self.s3_hook: Optional[S3Hook] = None + self.ftp_hook: Optional[FTPHook] = None def __upload_to_s3_from_ftp(self, remote_filename, s3_file_key): with NamedTemporaryFile() as local_tmp_file: @@ -132,12 +132,13 @@ def execute(self, context: 'Context'): if self.ftp_filenames == '*': files = list_dir else: - files = list(filter(lambda file: self.ftp_filenames in file, list_dir)) + ftp_filename: str = self.ftp_filenames + files = list(filter(lambda f: ftp_filename in f, list_dir)) for file in files: self.log.info(f'Moving file {file}') - if self.s3_filenames: + if self.s3_filenames and isinstance(self.s3_filenames, str): filename = file.replace(self.ftp_filenames, self.s3_filenames) else: filename = file diff --git a/airflow/providers/apache/drill/operators/drill.py b/airflow/providers/apache/drill/operators/drill.py index deea6b91270b2..aaa1923bf0050 100644 --- a/airflow/providers/apache/drill/operators/drill.py +++ b/airflow/providers/apache/drill/operators/drill.py @@ -62,7 +62,7 @@ def __init__( self.sql = sql self.drill_conn_id = drill_conn_id self.parameters = parameters - self.hook = None + self.hook: Optional[DrillHook] = None def execute(self, context: 'Context'): self.log.info('Executing: %s on %s', self.sql, self.drill_conn_id) diff --git a/airflow/providers/apache/kylin/operators/kylin_cube.py b/airflow/providers/apache/kylin/operators/kylin_cube.py index 794efb85b03e3..993c8667e0bff 100644 --- a/airflow/providers/apache/kylin/operators/kylin_cube.py +++ b/airflow/providers/apache/kylin/operators/kylin_cube.py @@ -152,6 +152,8 @@ def execute(self, context: 'Context'): _hook = KylinHook(kylin_conn_id=self.kylin_conn_id, project=self.project, dsn=self.dsn) _support_invoke_command = kylinpy.CubeSource.support_invoke_command + if not self.command: + raise AirflowException(f'Kylin:Command {self.command} can not be empty') if self.command.lower() not in _support_invoke_command: raise AirflowException( f'Kylin:Command {self.command} can not match kylin command list {_support_invoke_command}' diff --git a/airflow/providers/apache/pig/operators/pig.py b/airflow/providers/apache/pig/operators/pig.py index cce970ca1ab0f..9e23cb39e6d8b 100644 --- a/airflow/providers/apache/pig/operators/pig.py +++ b/airflow/providers/apache/pig/operators/pig.py @@ -65,7 +65,7 @@ def __init__( self.pig = pig self.pig_cli_conn_id = pig_cli_conn_id self.pig_opts = pig_opts - self.hook = None + self.hook: Optional[PigCliHook] = None def prepare_template(self): if self.pigparams_jinja_translate: diff --git a/airflow/providers/asana/operators/asana_tasks.pyi b/airflow/providers/asana/operators/asana_tasks.pyi new file mode 100644 index 0000000000000..59cff93218ebc --- /dev/null +++ b/airflow/providers/asana/operators/asana_tasks.pyi @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Optional + +from airflow.models import BaseOperator + +class AsanaCreateTaskOperator(BaseOperator): + def __init__( + self, + *, + name: str, + conn_id: Optional[str] = None, + task_parameters: Optional[dict] = None, + **kwargs, + ) -> None: ... + +class AsanaUpdateTaskOperator(BaseOperator): + def __init__( + self, + *, + conn_id: Optional[str] = None, + asana_task_gid: str, + task_parameters: dict, + **kwargs, + ) -> None: ... + +class AsanaDeleteTaskOperator(BaseOperator): + def __init__( + self, + *, + conn_id: Optional[str] = None, + asana_task_gid: str, + **kwargs, + ) -> None: ... + +class AsanaFindTaskOperator(BaseOperator): + def __init__( + self, + *, + conn_id: Optional[str] = None, + search_parameters: Optional[dict] = None, + **kwargs, + ) -> None: ... diff --git a/airflow/providers/cncf/kubernetes/CHANGELOG.rst b/airflow/providers/cncf/kubernetes/CHANGELOG.rst index 26f185246bd6d..26f0e96544ab0 100644 --- a/airflow/providers/cncf/kubernetes/CHANGELOG.rst +++ b/airflow/providers/cncf/kubernetes/CHANGELOG.rst @@ -26,10 +26,13 @@ Breaking changes ~~~~~~~~~~~~~~~~ * ``Simplify KubernetesPodOperator (#19572)`` +* Class ``pod_launcher.PodLauncher`` renamed to ``pod_manager.PodManager`` +* :func:`airflow.settings.pod_mutation_hook` is no longer called in :meth:`~cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async``. For ``KubernetesPodOperator``, mutation now occurs in ``build_pod_request_obj``. +* Parameter ``is_delete_operator_pod`` default is changed to ``True`` so that pods are deleted after task completion and not left to accumulate. In practice it seems more common to disable pod deletion only on a temporary basis for debugging purposes and therefore pod deletion is the more sensible default. -.. warning:: Many methods in :class:`~.KubernetesPodOperator` and class:`~.PodLauncher` have been renamed. - If you have subclassed :class:`~.KubernetesPodOperator` will need to update your subclass to reflect - the new structure. Additionally ``PodStatus`` enum has been renamed to ``PodPhase``. +.. warning:: Many methods in :class:`~.KubernetesPodOperator` and class:`~.PodManager` (formerly named ``PodLauncher``) + have been renamed. If you have subclassed :class:`~.KubernetesPodOperator` you will need to update your subclass to + reflect the new structure. Additionally, class ``PodStatus`` has been renamed to ``PodPhase``. Notes on changes KubernetesPodOperator and PodLauncher `````````````````````````````````````````````````````` @@ -51,7 +54,7 @@ into the top level of ``execute`` because it can be the same for "attached" pods :meth:`~.KubernetesPodOperator.get_or_create_pod` tries first to find an existing pod using labels specific to the task instance (see :meth:`~.KubernetesPodOperator.find_pod`). -If one does not exist it :meth:`creates a pod <~.PodLauncher.create_pod>`. +If one does not exist it :meth:`creates a pod <~.PodManager.create_pod>`. The "waiting" part of execution has three components. The first step is to wait for the pod to leave the ``Pending`` phase (:meth:`~.KubernetesPodOperator.await_pod_start`). Next, if configured to do so, @@ -59,7 +62,7 @@ the operator will :meth:`follow the base container logs <~.KubernetesPodOperator and forward these logs to the task logger until the ``base`` container is done. If not configured to harvest the logs, the operator will instead :meth:`poll for container completion until done <~.KubernetesPodOperator.await_container_completion>`; either way, we must await container completion before harvesting xcom. After (optionally) extracting the xcom -value from the base container, we :meth:`await pod completion <~.PodLauncher.await_pod_completion>`. +value from the base container, we :meth:`await pod completion <~.PodManager.await_pod_completion>`. Previously, depending on whether the pod was "reattached to" (e.g. after a worker failure) or created anew, the waiting logic may have occurred in either ``handle_pod_overlap`` or ``create_new_pod_for_operator``. @@ -80,7 +83,7 @@ Details on method renames, refactors, and deletions In ``KubernetesPodOperator``: -* Method ``create_pod_launcher`` is converted to cached property ``launcher`` +* Method ``create_pod_launcher`` is converted to cached property ``pod_manager`` * Construction of k8s ``CoreV1Api`` client is now encapsulated within cached property ``client`` * Logic to search for an existing pod (e.g. after an airflow worker failure) is moved out of ``execute`` and into method ``find_pod``. * Method ``handle_pod_overlap`` is removed. Previously it monitored a "found" pod until completion. With this change the pod monitoring (and log following) is orchestrated directly from ``execute`` and it is the same whether it's a "found" pod or a "new" pod. See methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``. @@ -90,7 +93,7 @@ In ``KubernetesPodOperator``: * Method ``_try_numbers_match`` is removed. * Method ``create_new_pod_for_operator`` is removed. Previously it would mutate the labels on ``self.pod``, launch the pod, monitor the pod to completion etc. Now this logic is in part handled by ``get_or_create_pod``, where a new pod will be created if necessary. The monitoring etc is now orchestrated directly from ``execute``. Again, see the calls to methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``. -In ``pod_launcher.py``, in class ``PodLauncher``: +In class ``PodManager`` (formerly ``PodLauncher``): * Method ``start_pod`` is removed and split into two methods: ``create_pod`` and ``await_pod_start``. * Method ``monitor_pod`` is removed and split into methods ``follow_container_logs``, ``await_container_completion``, ``await_pod_completion`` @@ -99,7 +102,7 @@ In ``pod_launcher.py``, in class ``PodLauncher``: * Method ``read_pod_logs`` now takes kwarg ``container_name`` -Other changes in ``pod_launcher.py``: +Other changes in ``pod_manager.py`` (formerly ``pod_launcher.py``): * Enum-like class ``PodStatus`` is renamed ``PodPhase``, and the values are no longer lower-cased. diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index a296b07ec0456..2a17d88687ea4 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -24,7 +24,8 @@ from kubernetes.client import CoreV1Api, models as k8s -from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLaunchFailedException, PodPhase +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodLaunchFailedException, PodManager, PodPhase +from airflow.settings import pod_mutation_hook try: import airflow.utils.yaml as yaml @@ -54,7 +55,7 @@ convert_volume_mount, ) from airflow.providers.cncf.kubernetes.backcompat.pod_runtime_info_env import PodRuntimeInfoEnv -from airflow.providers.cncf.kubernetes.utils import pod_launcher, xcom_sidecar +from airflow.providers.cncf.kubernetes.utils import xcom_sidecar from airflow.utils.helpers import validate_key from airflow.version import version as airflow_version @@ -148,8 +149,8 @@ class KubernetesPodOperator(BaseOperator): :param service_account_name: Name of the service account :type service_account_name: str :param is_delete_operator_pod: What to do when the pod reaches its final - state, or the execution is interrupted. - If False (default): do nothing, If True: delete the pod + state, or the execution is interrupted. If True (default), delete the + pod; if False, leave the pod. :type is_delete_operator_pod: bool :param hostnetwork: If True enable host networking on the pod. :type hostnetwork: bool @@ -225,7 +226,7 @@ def __init__( node_selector: Optional[dict] = None, image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None, service_account_name: Optional[str] = None, - is_delete_operator_pod: bool = False, + is_delete_operator_pod: bool = True, hostnetwork: bool = False, tolerations: Optional[List[k8s.V1Toleration]] = None, security_context: Optional[Dict] = None, @@ -346,8 +347,8 @@ def _get_ti_pod_labels(context: Optional[dict] = None, include_try_number: bool return labels @cached_property - def launcher(self) -> pod_launcher.PodLauncher: - return pod_launcher.PodLauncher(kube_client=self.client) + def pod_manager(self) -> PodManager: + return PodManager(kube_client=self.client) @cached_property def client(self) -> CoreV1Api: @@ -384,21 +385,21 @@ def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context): if pod: return pod self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict())) - self.launcher.create_pod(pod=pod_request_obj) + self.pod_manager.create_pod(pod=pod_request_obj) return pod_request_obj def await_pod_start(self, pod): try: - self.launcher.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds) + self.pod_manager.await_pod_start(pod=pod, startup_timeout=self.startup_timeout_seconds) except PodLaunchFailedException: if self.log_events_on_failure: - for event in self.launcher.read_pod_events(pod).items: + for event in self.pod_manager.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) raise def extract_xcom(self, pod): """Retrieves xcom value and kills xcom sidecar container""" - result = self.launcher.extract_xcom(pod) + result = self.pod_manager.extract_xcom(pod) self.log.info("xcom result: \n%s", result) return json.loads(result) @@ -413,18 +414,18 @@ def execute(self, context: 'Context'): self.await_pod_start(pod=self.pod) if self.get_logs: - self.launcher.follow_container_logs( + self.pod_manager.follow_container_logs( pod=self.pod, container_name=self.BASE_CONTAINER_NAME, ) else: - self.launcher.await_container_completion( + self.pod_manager.await_container_completion( pod=self.pod, container_name=self.BASE_CONTAINER_NAME ) if self.do_xcom_push: result = self.extract_xcom(pod=self.pod) - remote_pod = self.launcher.await_pod_completion(self.pod) + remote_pod = self.pod_manager.await_pod_completion(self.pod) finally: self.cleanup( pod=self.pod or self.pod_request_obj, @@ -441,7 +442,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): if pod_phase != PodPhase.SUCCEEDED: if self.log_events_on_failure: with _suppress(Exception): - for event in self.launcher.read_pod_events(pod).items: + for event in self.pod_manager.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) if not self.is_delete_operator_pod: with _suppress(Exception): @@ -456,7 +457,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): def process_pod_deletion(self, pod): if self.is_delete_operator_pod: self.log.info("Deleting pod: %s", pod.metadata.name) - self.launcher.delete_pod(pod) + self.pod_manager.delete_pod(pod) else: self.log.info("skipping deleting pod: %s", pod.metadata.name) @@ -574,8 +575,18 @@ def build_pod_request_obj(self, context=None): 'kubernetes_pod_operator': 'True', } ) + pod_mutation_hook(pod) return pod + def dry_run(self) -> None: + """ + Prints out the pod definition that would be created by this operator. + Does not include labels specific to the task instance (since there isn't + one in a dry_run) and excludes all empty elements. + """ + pod = self.build_pod_request_obj() + print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict'))) + class _suppress(AbstractContextManager): """ @@ -600,3 +611,53 @@ def __exit__(self, exctype, excinst, exctb): logger = logging.getLogger() logger.error(str(excinst), exc_info=True) return caught_error + + +def _prune_dict(val: Any, mode='strict'): + """ + Note: this is duplicated from ``airflow.utils.helpers.prune_dict``. That one should + be the one used if possible, but this one is included to avoid having to + bump min airflow version. This function will be removed once the min airflow version + is bumped to 2.3. + + Given dict ``val``, returns new dict based on ``val`` with all + empty elements removed. + + What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict' + then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x`` + will be removed if ``bool(x) is False``. + """ + + def is_empty(x): + if mode == 'strict': + return x is None + elif mode == 'truthy': + return bool(x) is False + raise ValueError("allowable values for `mode` include 'truthy' and 'strict'") + + if isinstance(val, dict): + new_dict = {} + for k, v in val.items(): + if is_empty(v): + continue + elif isinstance(v, (list, dict)): + new_val = _prune_dict(v, mode=mode) + if new_val: + new_dict[k] = new_val + else: + new_dict[k] = v + return new_dict + elif isinstance(val, list): + new_list = [] + for v in val: + if is_empty(v): + continue + elif isinstance(v, (list, dict)): + new_val = _prune_dict(v, mode=mode) + if new_val: + new_list.append(new_val) + else: + new_list.append(v) + return new_list + else: + return val diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_manager.py similarity index 98% rename from airflow/providers/cncf/kubernetes/utils/pod_launcher.py rename to airflow/providers/cncf/kubernetes/utils/pod_manager.py index 43c4ebe597efd..70b5b4c2ebaed 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -36,7 +36,6 @@ from airflow.exceptions import AirflowException from airflow.kubernetes.kube_client import get_kube_client from airflow.kubernetes.pod_generator import PodDefaults -from airflow.settings import pod_mutation_hook from airflow.utils.log.logging_mixin import LoggingMixin @@ -79,8 +78,11 @@ def container_is_running(pod: V1Pod, container_name: str) -> bool: return container_status.state.running is not None -class PodLauncher(LoggingMixin): - """Launches PODS""" +class PodManager(LoggingMixin): + """ + Helper class for creating, monitoring, and otherwise interacting with Kubernetes pods + for use with the KubernetesPodOperator + """ def __init__( self, @@ -101,8 +103,6 @@ def __init__( def run_pod_async(self, pod: V1Pod, **kwargs) -> V1Pod: """Runs POD asynchronously""" - pod_mutation_hook(pod) - sanitized_pod = self._client.api_client.sanitize_for_serialization(pod) json_pod = json.dumps(sanitized_pod, indent=2) diff --git a/airflow/providers/databricks/operators/databricks.py b/airflow/providers/databricks/operators/databricks.py index 5aa4ec82a392f..8a0d76a6c1961 100644 --- a/airflow/providers/databricks/operators/databricks.py +++ b/airflow/providers/databricks/operators/databricks.py @@ -330,7 +330,7 @@ def __init__( self.json = _deep_string_coerce(self.json) # This variable will be used in case our task gets killed. - self.run_id = None + self.run_id: Optional[int] = None self.do_xcom_push = do_xcom_push def _get_hook(self) -> DatabricksHook: @@ -346,9 +346,14 @@ def execute(self, context: 'Context'): _handle_databricks_operator_execution(self, hook, self.log, context) def on_kill(self): - hook = self._get_hook() - hook.cancel_run(self.run_id) - self.log.info('Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id) + if self.run_id: + hook = self._get_hook() + hook.cancel_run(self.run_id) + self.log.info( + 'Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id + ) + else: + self.log.error('Error: Task: %s with invalid run_id was requested to be cancelled.', self.task_id) class DatabricksRunNowOperator(BaseOperator): @@ -544,7 +549,7 @@ def __init__( self.json = _deep_string_coerce(self.json) # This variable will be used in case our task gets killed. - self.run_id = None + self.run_id: Optional[int] = None self.do_xcom_push = do_xcom_push def _get_hook(self) -> DatabricksHook: @@ -560,6 +565,11 @@ def execute(self, context: 'Context'): _handle_databricks_operator_execution(self, hook, self.log, context) def on_kill(self): - hook = self._get_hook() - hook.cancel_run(self.run_id) - self.log.info('Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id) + if self.run_id: + hook = self._get_hook() + hook.cancel_run(self.run_id) + self.log.info( + 'Task: %s with run_id: %s was requested to be cancelled.', self.task_id, self.run_id + ) + else: + self.log.error('Error: Task: %s with invalid run_id was requested to be cancelled.', self.task_id) diff --git a/airflow/providers/facebook/ads/hooks/ads.py b/airflow/providers/facebook/ads/hooks/ads.py index 7ba0eaf508000..7ad56e7dc747e 100644 --- a/airflow/providers/facebook/ads/hooks/ads.py +++ b/airflow/providers/facebook/ads/hooks/ads.py @@ -157,7 +157,7 @@ def _facebook_report( self, account_id: str, api: FacebookAdsApi, - params: Dict[str, Any], + params: Optional[Dict[str, Any]], fields: List[str], sleep_time: int = 5, ) -> List[AdsInsights]: diff --git a/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py b/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py index 8c3ff7122829d..ce4c88b8bc129 100644 --- a/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py +++ b/airflow/providers/google/cloud/example_dags/example_kubernetes_engine.py @@ -63,6 +63,7 @@ image="perl", name="test-pod", in_cluster=False, + is_delete_operator_pod=True, ) # [START howto_operator_gke_start_pod_xcom] @@ -77,6 +78,7 @@ cmds=["sh", "-c", 'mkdir -p /airflow/xcom/;echo \'[1,2,3,4]\' > /airflow/xcom/return.json'], name="test-pod-xcom", in_cluster=False, + is_delete_operator_pod=True, ) # [END howto_operator_gke_start_pod_xcom] diff --git a/airflow/providers/google/cloud/operators/kubernetes_engine.py b/airflow/providers/google/cloud/operators/kubernetes_engine.py index 2f07c4343939e..704211252eff4 100644 --- a/airflow/providers/google/cloud/operators/kubernetes_engine.py +++ b/airflow/providers/google/cloud/operators/kubernetes_engine.py @@ -20,6 +20,7 @@ import os import tempfile +import warnings from typing import TYPE_CHECKING, Dict, Optional, Sequence, Union from google.cloud.container_v1.types import Cluster @@ -300,6 +301,11 @@ class GKEStartPodOperator(KubernetesPodOperator): :type impersonation_chain: Union[str, Sequence[str]] :param regional: The location param is region name. :type regional: bool + :param is_delete_operator_pod: What to do when the pod reaches its final + state, or the execution is interrupted. If True, delete the + pod; if False, leave the pod. Current default is False, but this will be + changed in the next major release of this provider. + :type is_delete_operator_pod: bool """ template_fields = {'project_id', 'location', 'cluster_name'} | set(KubernetesPodOperator.template_fields) @@ -314,9 +320,21 @@ def __init__( gcp_conn_id: str = 'google_cloud_default', impersonation_chain: Optional[Union[str, Sequence[str]]] = None, regional: bool = False, + is_delete_operator_pod: Optional[bool] = None, **kwargs, ) -> None: - super().__init__(**kwargs) + if is_delete_operator_pod is None: + warnings.warn( + f"You have not set parameter `is_delete_operator_pod` in class {self.__class__.__name__}. " + "Currently the default for this parameter is `False` but in a future release the default " + "will be changed to `True`. To ensure pods are not deleted in the future you will need to " + "set `is_delete_operator_pod=False` explicitly.", + DeprecationWarning, + stacklevel=2, + ) + is_delete_operator_pod = False + + super().__init__(is_delete_operator_pod=is_delete_operator_pod, **kwargs) self.project_id = project_id self.location = location self.cluster_name = cluster_name diff --git a/airflow/providers/google/cloud/operators/stackdriver.py b/airflow/providers/google/cloud/operators/stackdriver.py index c203cb798872a..b07c9e122e7a7 100644 --- a/airflow/providers/google/cloud/operators/stackdriver.py +++ b/airflow/providers/google/cloud/operators/stackdriver.py @@ -122,7 +122,7 @@ def __init__( self.project_id = project_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info( @@ -223,7 +223,7 @@ def __init__( self.timeout = timeout self.metadata = metadata self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info('Enable Alert Policies: Project id: %s Filter: %s', self.project_id, self.filter_) @@ -313,7 +313,7 @@ def __init__( self.timeout = timeout self.metadata = metadata self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info('Disable Alert Policies: Project id: %s Filter: %s', self.project_id, self.filter_) @@ -405,7 +405,7 @@ def __init__( self.project_id = project_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info('Upsert Alert Policies: Alerts: %s Project id: %s', self.alerts, self.project_id) @@ -493,7 +493,7 @@ def __init__( self.project_id = project_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info('Delete Alert Policy: Project id: %s Name: %s', self.project_id, self.name) @@ -606,7 +606,7 @@ def __init__( self.project_id = project_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info( @@ -708,7 +708,7 @@ def __init__( self.project_id = project_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info( @@ -800,7 +800,7 @@ def __init__( self.project_id = project_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info( @@ -894,7 +894,7 @@ def __init__( self.project_id = project_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info( @@ -984,7 +984,7 @@ def __init__( self.project_id = project_id self.delegate_to = delegate_to self.impersonation_chain = impersonation_chain - self.hook = None + self.hook: Optional[StackdriverHook] = None def execute(self, context: 'Context'): self.log.info('Delete Notification Channel: Project id: %s Name: %s', self.project_id, self.name) diff --git a/airflow/providers/postgres/hooks/postgres.py b/airflow/providers/postgres/hooks/postgres.py index 95abe67c77781..fe6b53a9ecb31 100644 --- a/airflow/providers/postgres/hooks/postgres.py +++ b/airflow/providers/postgres/hooks/postgres.py @@ -192,7 +192,7 @@ def get_iam_token(self, conn: Connection) -> Tuple[str, str, int]: # Pull the custer-identifier from the beginning of the Redshift URL # ex. my-cluster.ccdre4hpd39h.us-east-1.redshift.amazonaws.com returns my-cluster cluster_identifier = conn.extra_dejson.get('cluster-identifier', conn.host.split('.')[0]) - session, endpoint_url = aws_hook._get_credentials() + session, endpoint_url = aws_hook._get_credentials(region_name=None) client = session.client( "redshift", endpoint_url=endpoint_url, diff --git a/airflow/providers/postgres/operators/postgres.py b/airflow/providers/postgres/operators/postgres.py index 74d0195e0f48c..dd20de6dd8377 100644 --- a/airflow/providers/postgres/operators/postgres.py +++ b/airflow/providers/postgres/operators/postgres.py @@ -65,7 +65,7 @@ def __init__( self.autocommit = autocommit self.parameters = parameters self.database = database - self.hook = None + self.hook: Optional[PostgresHook] = None def execute(self, context: 'Context'): self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database) diff --git a/airflow/providers/slack/hooks/slack_webhook.py b/airflow/providers/slack/hooks/slack_webhook.py index c3cbef126d576..1c024fdbaa30f 100644 --- a/airflow/providers/slack/hooks/slack_webhook.py +++ b/airflow/providers/slack/hooks/slack_webhook.py @@ -34,7 +34,7 @@ class SlackWebhookHook(HttpHook): Each Slack webhook token can be pre-configured to use a specific channel, username and icon. You can override these defaults in this hook. - :param http_conn_id: connection that has Slack webhook token in the extra field + :param http_conn_id: connection that has Slack webhook token in the password field :type http_conn_id: str :param webhook_token: Slack webhook token :type webhook_token: str diff --git a/airflow/settings.py b/airflow/settings.py index a4b76d5230b87..db23a9f2ecdb0 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -184,8 +184,7 @@ def task_instance_mutation_hook(task_instance): def pod_mutation_hook(pod): """ This setting allows altering ``kubernetes.client.models.V1Pod`` object - before they are passed to the Kubernetes client by the ``PodLauncher`` - for scheduling. + before they are passed to the Kubernetes client for scheduling. To define a pod mutation hook, add a ``airflow_local_settings`` module to your PYTHONPATH that defines this ``pod_mutation_hook`` function. diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 1ff57eff97112..c75a017e2b165 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -310,3 +310,48 @@ def exactly_one(*args) -> bool: "Not supported for iterable args. Use `*` to unpack your iterable in the function call." ) return sum(map(bool, args)) == 1 + + +def prune_dict(val: Any, mode='strict'): + """ + Given dict ``val``, returns new dict based on ``val`` with all + empty elements removed. + + What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict' + then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x`` + will be removed if ``bool(x) is False``. + """ + + def is_empty(x): + if mode == 'strict': + return x is None + elif mode == 'truthy': + return bool(x) is False + raise ValueError("allowable values for `mode` include 'truthy' and 'strict'") + + if isinstance(val, dict): + new_dict = {} + for k, v in val.items(): + if is_empty(v): + continue + elif isinstance(v, (list, dict)): + new_val = prune_dict(v, mode=mode) + if new_val: + new_dict[k] = new_val + else: + new_dict[k] = v + return new_dict + elif isinstance(val, list): + new_list = [] + for v in val: + if is_empty(v): + continue + elif isinstance(v, (list, dict)): + new_val = prune_dict(v, mode=mode) + if new_val: + new_list.append(new_val) + else: + new_list.append(v) + return new_list + else: + return val diff --git a/breeze-complete b/breeze-complete index 6ddfd85fa3c4e..121d8b9ca7ebb 100644 --- a/breeze-complete +++ b/breeze-complete @@ -151,6 +151,7 @@ update-breeze-file update-extras update-local-yml-file update-setup-cfg-file +update-supported-versions update-versions vendor-k8s-json-schema verify-db-migrations-documented diff --git a/chart/templates/secrets/extra-secrets.yaml b/chart/templates/secrets/extra-secrets.yaml index 9137a0e49ded4..20c1751d96b43 100644 --- a/chart/templates/secrets/extra-secrets.yaml +++ b/chart/templates/secrets/extra-secrets.yaml @@ -36,6 +36,9 @@ metadata: "helm.sh/hook": "pre-install,pre-upgrade" "helm.sh/hook-delete-policy": "before-hook-creation" "helm.sh/hook-weight": "0" +{{- if $secretContent.type }} +type: {{ $secretContent.type }} +{{- end }} {{- if $secretContent.data }} data: {{- with $secretContent.data }} diff --git a/chart/tests/test_extra_configmaps_secrets.py b/chart/tests/test_extra_configmaps_secrets.py index 3943990e08ce7..d1e2b258428ee 100644 --- a/chart/tests/test_extra_configmaps_secrets.py +++ b/chart/tests/test_extra_configmaps_secrets.py @@ -77,6 +77,14 @@ def test_extra_secrets(self): stringData: | MY_SECRET_3: "MY_SECRET_3" MY_SECRET_4: "MY_SECRET_4" + "{{ .Release.Name }}-other-secrets-with-type": + type: kubernetes.io/dockerconfigjson + data: | + MY_SECRET_5: {{ printf "MY_SECRET_5" | b64enc }} + MY_SECRET_6: {{ printf "MY_SECRET_6" | b64enc }} + stringData: | + MY_SECRET_7: "MY_SECRET_7" + MY_SECRET_8: "MY_SECRET_8" """ ) values = yaml.safe_load(values_str) @@ -88,6 +96,7 @@ def test_extra_secrets(self): all_expected_keys = [ ("Secret", f"{RELEASE_NAME}-airflow-connections"), ("Secret", f"{RELEASE_NAME}-other-secrets"), + ("Secret", f"{RELEASE_NAME}-other-secrets-with-type"), ] assert set(k8s_objects_by_key.keys()) == set(all_expected_keys) @@ -97,16 +106,26 @@ def test_extra_secrets(self): "MY_SECRET_1": b64encode(b"MY_SECRET_1").decode("utf-8"), "MY_SECRET_2": b64encode(b"MY_SECRET_2").decode("utf-8"), }, + { + "MY_SECRET_5": b64encode(b"MY_SECRET_5").decode("utf-8"), + "MY_SECRET_6": b64encode(b"MY_SECRET_6").decode("utf-8"), + }, ] all_expected_string_data = [ {"AIRFLOW_CON_GCP": "gcp_connection_string"}, {"MY_SECRET_3": "MY_SECRET_3", "MY_SECRET_4": "MY_SECRET_4"}, + {"MY_SECRET_7": "MY_SECRET_7", "MY_SECRET_8": "MY_SECRET_8"}, ] - for expected_key, expected_data, expected_string_data in zip( - all_expected_keys, all_expected_data, all_expected_string_data + all_expected_types = [None, None, "kubernetes.io/dockerconfigjson"] + for expected_key, expected_data, expected_string_data, expected_type in zip( + all_expected_keys, all_expected_data, all_expected_string_data, all_expected_types ): configmap_obj = k8s_objects_by_key[expected_key] + if expected_type: + assert configmap_obj["type"] == expected_type + else: + assert "type" not in configmap_obj assert configmap_obj["data"] == expected_data assert configmap_obj["stringData"] == expected_string_data diff --git a/chart/values.schema.json b/chart/values.schema.json index 45bb67bdefd1e..4e8a1b545e3bc 100644 --- a/chart/values.schema.json +++ b/chart/values.schema.json @@ -814,6 +814,10 @@ "minProperties": 1, "additionalProperties": false, "properties": { + "type": { + "description": "Type **as string** of secret E.G. Opaque, kubernetes.io/dockerconfigjson, etc.", + "type": "string" + }, "data": { "description": "Content **as string** for the 'data' item of the secret (can be templated)", "type": "string" diff --git a/chart/values.yaml b/chart/values.yaml index 3023484a159a0..1c313af2d7d71 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -264,6 +264,7 @@ extraSecrets: {} # eg: # extraSecrets: # '{{ .Release.Name }}-airflow-connections': +# type: 'Opaque' # data: | # AIRFLOW_CONN_GCP: 'base64_encoded_gcp_conn_string' # AIRFLOW_CONN_AWS: 'base64_encoded_aws_conn_string' diff --git a/dev/README_RELEASE_AIRFLOW.md b/dev/README_RELEASE_AIRFLOW.md index 8c02a6a253972..aac2a8691fe14 100644 --- a/dev/README_RELEASE_AIRFLOW.md +++ b/dev/README_RELEASE_AIRFLOW.md @@ -957,6 +957,7 @@ EOF This includes: +- Modify `./scripts/ci/pre-commit/supported_versions.py` and let pre-commit do the job - Sync `CHANGELOG.txt`, `UPDATING.md` and `README.md` changes - Updating issue templates in `.github/ISSUE_TEMPLATE/` with the new version diff --git a/dev/provider_packages/prepare_provider_packages.py b/dev/provider_packages/prepare_provider_packages.py index c840348a5f496..b69458b5d2cd0 100755 --- a/dev/provider_packages/prepare_provider_packages.py +++ b/dev/provider_packages/prepare_provider_packages.py @@ -2121,6 +2121,11 @@ def summarise_total_vs_bad_and_warnings(total: int, bad: int, warns: List[warnin " adheres to: 'pyarrow<3.1.0,>=3.0.0; extra == \"pandas\"'", "snowflake", ), + ( + "You have an incompatible version of 'pyarrow' installed (6.0.1), please install a version that" + " adheres to: 'pyarrow<5.1.0,>=5.0.0; extra == \"pandas\"'", + "snowflake", + ), ("SelectableGroups dict interface is deprecated. Use select.", "kombu"), ("The module cloudant is now deprecated. The replacement is ibmcloudant.", "cloudant"), } diff --git a/docs/apache-airflow-providers-apache-cassandra/connections/cassandra.rst b/docs/apache-airflow-providers-apache-cassandra/connections/cassandra.rst index 521c91819a966..946165b440f0e 100644 --- a/docs/apache-airflow-providers-apache-cassandra/connections/cassandra.rst +++ b/docs/apache-airflow-providers-apache-cassandra/connections/cassandra.rst @@ -64,11 +64,11 @@ Examples for the **Extra** field If SSL is enabled in Cassandra, pass in a dict in the extra field as kwargs for ``ssl.wrap_socket()``. For example: -.. code-block:: JSON +.. code-block:: json { - "ssl_options" : { - "ca_certs" : "PATH/TO/CA_CERTS" + "ssl_options": { + "ca_certs": "PATH/TO/CA_CERTS" } } @@ -78,7 +78,7 @@ Default load balancing policy is ``RoundRobinPolicy``. Following are a few sampl DCAwareRoundRobinPolicy: -.. code-block:: JSON +.. code-block:: json { "load_balancing_policy": "DCAwareRoundRobinPolicy", @@ -90,7 +90,7 @@ DCAwareRoundRobinPolicy: WhiteListRoundRobinPolicy: -.. code-block:: JSON +.. code-block:: json { "load_balancing_policy": "WhiteListRoundRobinPolicy", @@ -101,7 +101,7 @@ WhiteListRoundRobinPolicy: TokenAwarePolicy: -.. code-block:: JSON +.. code-block:: json { "load_balancing_policy": "TokenAwarePolicy", diff --git a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst index 354d95668c612..92e067c51d6b4 100644 --- a/docs/apache-airflow-providers-cncf-kubernetes/operators.rst +++ b/docs/apache-airflow-providers-cncf-kubernetes/operators.rst @@ -49,6 +49,31 @@ dependencies that are not available through the public PyPI repository. It also YAML file using the ``pod_template_file`` parameter. Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in. +Debugging KubernetesPodOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can print out the Kubernetes manifest for the pod that would be created at runtime by calling +:meth:`~.KubernetesPodOperator.dry_run` on an instance of the operator. + +.. code-block:: python + + from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator, + ) + + k = KubernetesPodOperator( + name="hello-dry-run", + image="debian", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + task_id="dry_run_demo", + do_xcom_push=True, + ) + + k.dry_run() + + How to use cluster ConfigMaps, Secrets, and Volumes with Pod? ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/apache-airflow/installation/supported-versions.rst b/docs/apache-airflow/installation/supported-versions.rst index c4a2a26df18dc..ccd81abaa9d71 100644 --- a/docs/apache-airflow/installation/supported-versions.rst +++ b/docs/apache-airflow/installation/supported-versions.rst @@ -23,19 +23,21 @@ Version Life Cycle Apache Airflow version life cycle: -+---------+-----------------+---------------+-----------------+----------------+ -| Version | State | First Release | Limited Support | EOL/Terminated | -+---------+-----------------+---------------+-----------------+----------------+ -| 2 | Supported | Dec 17, 2020 | TBD | TBD | -+---------+-----------------+---------------+-----------------+----------------+ -| 1.10 | EOL | Aug 27, 2018 | Dec 17, 2020 | June 2021 | -+---------+-----------------+---------------+-----------------+----------------+ -| 1.9 | EOL | Jan 03, 2018 | Aug 27, 2018 | Aug 2018 | -+---------+-----------------+---------------+-----------------+----------------+ -| 1.8 | EOL | Mar 19, 2017 | Jan 03, 2018 | Jan 2018 | -+---------+-----------------+---------------+-----------------+----------------+ -| 1.7 | EOL | Mar 28, 2016 | Mar 19, 2017 | Mar 2017 | -+---------+-----------------+---------------+-----------------+----------------+ + .. This table is automatically updated by pre-commit scripts/ci/pre-commit/supported_versions.py + .. Beginning of auto-generated table + +========= ===================== ========= =============== ================= ================ +Version Current Patch/Minor State First Release Limited Support EOL/Terminated +========= ===================== ========= =============== ================= ================ +2 2.2.3 Supported Dec 17, 2020 TBD TBD +1.10 1.10.15 EOL Aug 27, 2018 Dec 17, 2020 June 17, 2021 +1.9 1.9.0 EOL Jan 03, 2018 Aug 27, 2018 Aug 27, 2018 +1.8 1.8.2 EOL Mar 19, 2017 Jan 03, 2018 Jan 03, 2018 +1.7 1.7.1.2 EOL Mar 28, 2016 Mar 19, 2017 Mar 19, 2017 +========= ===================== ========= =============== ================= ================ + + .. End of auto-generated table + Limited support versions will be supported with security and critical bug fix only. EOL versions will not get any fixes nor support. diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 082d2ceb39b1a..745b1457a6367 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -36,7 +36,7 @@ from airflow.kubernetes.secret import Secret from airflow.models import DAG, XCOM_RETURN_KEY, DagRun, TaskInstance from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.utils import timezone from airflow.version import version as airflow_version @@ -367,7 +367,7 @@ def test_port(self): assert self.expected_pod == actual_pod def test_volume_mount(self): - with mock.patch.object(PodLauncher, 'log') as mock_logger: + with mock.patch.object(PodManager, 'log') as mock_logger: volume_mount = k8s.V1VolumeMount( name='test-volume', mount_path='/tmp/test_volume', sub_path=None, read_only=False ) @@ -573,8 +573,8 @@ def test_xcom_push(self, xcom_push): self.expected_pod['spec']['containers'].append(container) assert self.expected_pod == actual_pod - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") def test_envs_from_secrets(self, mock_client, await_pod_completion_mock, create_pod): # GIVEN @@ -813,9 +813,9 @@ def test_init_container(self): ] assert self.expected_pod == actual_pod - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.extract_xcom") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") def test_pod_template_file(self, mock_client, await_pod_completion_mock, create_mock, extract_xcom_mock): extract_xcom_mock.return_value = '{}' @@ -899,8 +899,8 @@ def test_pod_template_file(self, mock_client, await_pod_completion_mock, create_ del actual_pod['metadata']['labels']['airflow_version'] assert expected_dict == actual_pod - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") def test_pod_priority_class_name(self, mock_client, await_pod_completion_mock, create_mock): """Test ability to assign priorityClassName to pod""" @@ -943,7 +943,7 @@ def test_pod_name(self): do_xcom_push=False, ) - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") def test_on_kill(self, await_pod_completion_mock): client = kube_client.get_kube_client(in_cluster=False) @@ -994,7 +994,7 @@ def test_reattach_failing_pod_once(self): # launch pod with mock.patch( - "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion" + "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion" ) as await_pod_completion_mock: pod_mock = MagicMock() @@ -1011,7 +1011,7 @@ def test_reattach_failing_pod_once(self): # should not call `create_pod`, because there's a pod there it should find # should use the found pod and patch as "already_checked" (in failure block) with mock.patch( - "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod" + "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod" ) as create_mock: with pytest.raises(AirflowException): k.execute(context) @@ -1022,7 +1022,7 @@ def test_reattach_failing_pod_once(self): # `create_pod` should be called because though there's still a pod to be found, # it will be `already_checked` with mock.patch( - "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod" + "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod" ) as create_mock: with pytest.raises(AirflowException): k.execute(context) diff --git a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py index c11375579a67b..61df4ab70953a 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py +++ b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py @@ -36,7 +36,7 @@ from airflow.kubernetes.volume_mount import VolumeMount from airflow.models import DAG, DagRun, TaskInstance from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.utils import timezone from airflow.version import version as airflow_version @@ -117,8 +117,8 @@ def tearDown(self): client = kube_client.get_kube_client(in_cluster=False) client.delete_collection_namespaced_pod(namespace="default") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") def test_image_pull_secrets_correctly_set(self, mock_client, await_pod_completion_mock, create_mock): fake_pull_secrets = "fakeSecret" @@ -266,7 +266,7 @@ def test_port(self): assert self.expected_pod == actual_pod def test_volume_mount(self): - with patch.object(PodLauncher, 'log') as mock_logger: + with patch.object(PodManager, 'log') as mock_logger: volume_mount = VolumeMount( 'test-volume', mount_path='/tmp/test_volume', sub_path=None, read_only=False ) @@ -451,8 +451,8 @@ def test_xcom_push(self): self.expected_pod['spec']['containers'].append(container) assert self.expected_pod == actual_pod - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") def test_envs_from_configmaps(self, mock_client, mock_monitor, mock_start): # GIVEN @@ -480,8 +480,8 @@ def test_envs_from_configmaps(self, mock_client, mock_monitor, mock_start): k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap)) ] - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.create_pod") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_pod_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.create_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") @mock.patch("airflow.kubernetes.kube_client.get_kube_client") def test_envs_from_secrets(self, mock_client, await_pod_completion_mock, create_mock): # GIVEN diff --git a/scripts/ci/pre_commit/supported_versions.py b/scripts/ci/pre_commit/supported_versions.py new file mode 100755 index 0000000000000..dd5d26a8bc5d1 --- /dev/null +++ b/scripts/ci/pre_commit/supported_versions.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pathlib import Path + +from tabulate import tabulate + +AIRFLOW_SOURCES = Path(__file__).resolve().parent.parent.parent.parent + + +HEADERS = ("Version", "Current Patch/Minor", "State", "First Release", "Limited Support", "EOL/Terminated") + +SUPPORTED_VERSIONS = ( + ("2", "2.2.3", "Supported", "Dec 17, 2020", "TBD", "TBD"), + ("1.10", "1.10.15", "EOL", "Aug 27, 2018", "Dec 17, 2020", "June 17, 2021"), + ("1.9", "1.9.0", "EOL", "Jan 03, 2018", "Aug 27, 2018", "Aug 27, 2018"), + ("1.8", "1.8.2", "EOL", "Mar 19, 2017", "Jan 03, 2018", "Jan 03, 2018"), + ("1.7", "1.7.1.2", "EOL", "Mar 28, 2016", "Mar 19, 2017", "Mar 19, 2017"), +) + + +def replace_text_between(file: Path, start: str, end: str, replacement_text: str): + original_text = file.read_text() + leading_text = original_text.split(start)[0] + trailing_text = original_text.split(end)[1] + file.write_text(leading_text + start + replacement_text + end + trailing_text) + + +if __name__ == '__main__': + replace_text_between( + file=AIRFLOW_SOURCES / "README.md", + start="\n", + end="\n", + replacement_text="\n" + + tabulate( + SUPPORTED_VERSIONS, tablefmt="github", headers=HEADERS, stralign="left", disable_numparse=True + ) + + "\n\n", + ) + replace_text_between( + file=AIRFLOW_SOURCES / "docs" / "apache-airflow" / "installation" / "supported-versions.rst", + start=" .. Beginning of auto-generated table\n", + end=" .. End of auto-generated table\n", + replacement_text="\n" + + tabulate( + SUPPORTED_VERSIONS, tablefmt="rst", headers=HEADERS, stralign="left", disable_numparse=True + ) + + "\n\n", + ) diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index 6b897592ca6dd..b9502d3e88940 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -26,7 +26,11 @@ from airflow.models import DAG, DagRun, TaskInstance from airflow.models.xcom import IN_MEMORY_DAGRUN_ID from airflow.operators.dummy import DummyOperator -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator, + _prune_dict, + _suppress, +) from airflow.utils import timezone DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0) @@ -45,14 +49,14 @@ def create_context(task): } -POD_LAUNCHER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher" +POD_MANAGER_CLASS = "airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager" class TestKubernetesPodOperator: def setup_method(self): - self.create_pod_patch = mock.patch(f"{POD_LAUNCHER_CLASS}.create_pod") - self.await_pod_patch = mock.patch(f"{POD_LAUNCHER_CLASS}.await_pod_start") - self.await_pod_completion_patch = mock.patch(f"{POD_LAUNCHER_CLASS}.await_pod_completion") + self.create_pod_patch = mock.patch(f"{POD_MANAGER_CLASS}.create_pod") + self.await_pod_patch = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_start") + self.await_pod_completion_patch = mock.patch(f"{POD_MANAGER_CLASS}.await_pod_completion") self.client_patch = mock.patch("airflow.kubernetes.kube_client.get_kube_client") self.create_mock = self.create_pod_patch.start() self.await_start_mock = self.await_pod_patch.start() @@ -231,7 +235,7 @@ def test_image_pull_policy_correctly_set(self): pod = k.build_pod_request_obj(create_context(k)) assert pod.spec.containers[0].image_pull_policy == "Always" - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod") def test_pod_delete_even_on_launcher_error(self, delete_pod_mock): k = KubernetesPodOperator( namespace="default", @@ -504,8 +508,8 @@ def test_pod_template_file(self, randomize_name): "execution_date": mock.ANY, } - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.follow_container_logs") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_container_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.follow_container_logs") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_container_completion") def test_describes_pod_on_failure(self, await_container_mock, follow_container_mock): name_base = "test" @@ -532,8 +536,8 @@ def test_describes_pod_on_failure(self, await_container_mock, follow_container_m assert not self.client_mock.return_value.read_namespaced_pod.called - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.follow_container_logs") - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.await_container_completion") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.follow_container_logs") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_container_completion") def test_no_handle_failure_on_success(self, await_container_mock, follow_container_mock): name_base = "test" @@ -718,7 +722,7 @@ def test_node_selector(self): assert sanitized_pod["spec"]["nodeSelector"] == node_selector @pytest.mark.parametrize('do_xcom_push', [True, False]) - @mock.patch(f"{POD_LAUNCHER_CLASS}.extract_xcom") + @mock.patch(f"{POD_MANAGER_CLASS}.extract_xcom") def test_push_xcom_pod_info(self, extract_xcom, do_xcom_push): """pod name and namespace are *always* pushed; do_xcom_push only controls xcom sidecar""" extract_xcom.return_value = '{}' @@ -756,7 +760,7 @@ def test_previous_pods_ignored_for_reattached(self): _, kwargs = self.client_mock.return_value.list_namespaced_pod.call_args assert 'already_checked!=True' in kwargs['label_selector'] - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod") @mock.patch( "airflow.providers.cncf.kubernetes.operators.kubernetes_pod" ".KubernetesPodOperator.patch_already_checked" @@ -779,7 +783,7 @@ def test_mark_created_pod_if_not_deleted(self, mock_patch_already_checked, mock_ mock_patch_already_checked.assert_called_once() mock_delete_pod.assert_not_called() - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod") @mock.patch( "airflow.providers.cncf.kubernetes.operators.kubernetes_pod" ".KubernetesPodOperator.patch_already_checked" @@ -802,7 +806,7 @@ def test_mark_created_pod_if_not_deleted_during_exception( mock_patch_already_checked.assert_called_once() mock_delete_pod.assert_not_called() - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.delete_pod") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.delete_pod") @mock.patch( "airflow.providers.cncf.kubernetes.operators." "kubernetes_pod.KubernetesPodOperator.patch_already_checked" @@ -834,3 +838,32 @@ def test__suppress(): raise ValueError("failure") mock_error.assert_called_once_with("failure", exc_info=True) + + +@pytest.mark.parametrize( + 'mode, expected', + [ + ( + 'strict', + { + 'b': '', + 'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, + 'd': ['', 0, '1'], + 'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']], + }, + ), + ( + 'truthy', + { + 'c': {'c': 'hi', 'd': ['1']}, + 'd': ['1'], + 'e': [{'c': 'hi', 'd': ['1']}, ['1']], + }, + ), + ], +) +def test__prune_dict(mode, expected): + l1 = ['', 0, '1', None] + d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1} + d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]} + assert _prune_dict(d2, mode=mode) == expected diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py similarity index 85% rename from tests/providers/cncf/kubernetes/utils/test_pod_launcher.py rename to tests/providers/cncf/kubernetes/utils/test_pod_manager.py index c02c025fcd870..b5561fe861120 100644 --- a/tests/providers/cncf/kubernetes/utils/test_pod_launcher.py +++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py @@ -23,18 +23,18 @@ from urllib3.exceptions import HTTPError as BaseHTTPError from airflow.exceptions import AirflowException -from airflow.providers.cncf.kubernetes.utils.pod_launcher import PodLauncher, PodPhase, container_is_running +from airflow.providers.cncf.kubernetes.utils.pod_manager import PodManager, PodPhase, container_is_running -class TestPodLauncher: +class TestPodManager: def setup_method(self): self.mock_kube_client = mock.Mock() - self.pod_launcher = PodLauncher(kube_client=self.mock_kube_client) + self.pod_manager = PodManager(kube_client=self.mock_kube_client) def test_read_pod_logs_successfully_returns_logs(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod_log.return_value = mock.sentinel.logs - logs = self.pod_launcher.read_pod_logs(pod=mock.sentinel, container_name='base') + logs = self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name='base') assert mock.sentinel.logs == logs def test_read_pod_logs_retries_successfully(self): @@ -43,7 +43,7 @@ def test_read_pod_logs_retries_successfully(self): BaseHTTPError('Boom'), mock.sentinel.logs, ] - logs = self.pod_launcher.read_pod_logs(pod=mock.sentinel, container_name='base') + logs = self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name='base') assert mock.sentinel.logs == logs self.mock_kube_client.read_namespaced_pod_log.assert_has_calls( [ @@ -74,12 +74,12 @@ def test_read_pod_logs_retries_fails(self): BaseHTTPError('Boom'), ] with pytest.raises(BaseHTTPError): - self.pod_launcher.read_pod_logs(pod=mock.sentinel, container_name='base') + self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name='base') def test_read_pod_logs_successfully_with_tail_lines(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs] - logs = self.pod_launcher.read_pod_logs(pod=mock.sentinel, container_name='base', tail_lines=100) + logs = self.pod_manager.read_pod_logs(pod=mock.sentinel, container_name='base', tail_lines=100) assert mock.sentinel.logs == logs self.mock_kube_client.read_namespaced_pod_log.assert_has_calls( [ @@ -98,7 +98,7 @@ def test_read_pod_logs_successfully_with_tail_lines(self): def test_read_pod_logs_successfully_with_since_seconds(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod_log.side_effect = [mock.sentinel.logs] - logs = self.pod_launcher.read_pod_logs(mock.sentinel, 'base', since_seconds=2) + logs = self.pod_manager.read_pod_logs(mock.sentinel, 'base', since_seconds=2) assert mock.sentinel.logs == logs self.mock_kube_client.read_namespaced_pod_log.assert_has_calls( [ @@ -117,7 +117,7 @@ def test_read_pod_logs_successfully_with_since_seconds(self): def test_read_pod_events_successfully_returns_events(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.list_namespaced_event.return_value = mock.sentinel.events - events = self.pod_launcher.read_pod_events(mock.sentinel) + events = self.pod_manager.read_pod_events(mock.sentinel) assert mock.sentinel.events == events def test_read_pod_events_retries_successfully(self): @@ -126,7 +126,7 @@ def test_read_pod_events_retries_successfully(self): BaseHTTPError('Boom'), mock.sentinel.events, ] - events = self.pod_launcher.read_pod_events(mock.sentinel) + events = self.pod_manager.read_pod_events(mock.sentinel) assert mock.sentinel.events == events self.mock_kube_client.list_namespaced_event.assert_has_calls( [ @@ -149,12 +149,12 @@ def test_read_pod_events_retries_fails(self): BaseHTTPError('Boom'), ] with pytest.raises(AirflowException): - self.pod_launcher.read_pod_events(mock.sentinel) + self.pod_manager.read_pod_events(mock.sentinel) def test_read_pod_returns_logs(self): mock.sentinel.metadata = mock.MagicMock() self.mock_kube_client.read_namespaced_pod.return_value = mock.sentinel.pod_info - pod_info = self.pod_launcher.read_pod(mock.sentinel) + pod_info = self.pod_manager.read_pod(mock.sentinel) assert mock.sentinel.pod_info == pod_info def test_read_pod_retries_successfully(self): @@ -163,7 +163,7 @@ def test_read_pod_retries_successfully(self): BaseHTTPError('Boom'), mock.sentinel.pod_info, ] - pod_info = self.pod_launcher.read_pod(mock.sentinel) + pod_info = self.pod_manager.read_pod(mock.sentinel) assert mock.sentinel.pod_info == pod_info self.mock_kube_client.read_namespaced_pod.assert_has_calls( [ @@ -186,7 +186,7 @@ def pod_state_gen(): self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen() self.mock_kube_client.read_namespaced_pod_log.return_value = iter(()) - self.pod_launcher.follow_container_logs(mock.sentinel, 'base') + self.pod_manager.follow_container_logs(mock.sentinel, 'base') def test_monitor_pod_logs_failures_non_fatal(self): mock.sentinel.metadata = mock.MagicMock() @@ -209,7 +209,7 @@ def pod_log_gen(): self.mock_kube_client.read_namespaced_pod_log.side_effect = pod_log_gen() - self.pod_launcher.follow_container_logs(mock.sentinel, 'base') + self.pod_manager.follow_container_logs(mock.sentinel, 'base') def test_read_pod_retries_fails(self): mock.sentinel.metadata = mock.MagicMock() @@ -219,38 +219,38 @@ def test_read_pod_retries_fails(self): BaseHTTPError('Boom'), ] with pytest.raises(AirflowException): - self.pod_launcher.read_pod(mock.sentinel) + self.pod_manager.read_pod(mock.sentinel) def test_parse_log_line(self): log_message = "This should return no timestamp" - timestamp, line = self.pod_launcher.parse_log_line(log_message) + timestamp, line = self.pod_manager.parse_log_line(log_message) assert timestamp is None assert line == log_message real_timestamp = "2020-10-08T14:16:17.793417674Z" - timestamp, line = self.pod_launcher.parse_log_line(" ".join([real_timestamp, log_message])) + timestamp, line = self.pod_manager.parse_log_line(" ".join([real_timestamp, log_message])) assert timestamp == pendulum.parse(real_timestamp) assert line == log_message with pytest.raises(Exception): - self.pod_launcher.parse_log_line('2020-10-08T14:16:17.793417674ZInvalidmessage\n') + self.pod_manager.parse_log_line('2020-10-08T14:16:17.793417674ZInvalidmessage\n') - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.run_pod_async") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async") def test_start_pod_retries_on_409_error(self, mock_run_pod_async): mock_run_pod_async.side_effect = [ ApiException(status=409), mock.MagicMock(), ] - self.pod_launcher.create_pod(mock.sentinel) + self.pod_manager.create_pod(mock.sentinel) assert mock_run_pod_async.call_count == 2 - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.run_pod_async") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async") def test_start_pod_fails_on_other_exception(self, mock_run_pod_async): mock_run_pod_async.side_effect = [ApiException(status=504)] with pytest.raises(ApiException): - self.pod_launcher.create_pod(mock.sentinel) + self.pod_manager.create_pod(mock.sentinel) - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.run_pod_async") + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async") def test_start_pod_retries_three_times(self, mock_run_pod_async): mock_run_pod_async.side_effect = [ ApiException(status=409), @@ -259,7 +259,7 @@ def test_start_pod_retries_three_times(self, mock_run_pod_async): ApiException(status=409), ] with pytest.raises(ApiException): - self.pod_launcher.create_pod(mock.sentinel) + self.pod_manager.create_pod(mock.sentinel) assert mock_run_pod_async.call_count == 3 @@ -270,16 +270,16 @@ def test_start_pod_raises_informative_error_on_timeout(self): expected_msg = "Check the pod events in kubernetes" mock_pod = MagicMock() with pytest.raises(AirflowException, match=expected_msg): - self.pod_launcher.await_pod_start( + self.pod_manager.await_pod_start( pod=mock_pod, startup_timeout=0, ) - @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_launcher.container_is_running') + @mock.patch('airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running') def test_container_is_running(self, container_is_running_mock): mock_pod = MagicMock() - self.pod_launcher.read_pod = mock.MagicMock(return_value=mock_pod) - self.pod_launcher.container_is_running(None, 'base') + self.pod_manager.read_pod = mock.MagicMock(return_value=mock_pod) + self.pod_manager.container_is_running(None, 'base') container_is_running_mock.assert_called_with(pod=mock_pod, container_name='base') diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py index 7b4424dd65e98..c36f40887e2b3 100644 --- a/tests/utils/test_helpers.py +++ b/tests/utils/test_helpers.py @@ -26,6 +26,7 @@ build_airflow_url_with_query, exactly_one, merge_dicts, + prune_dict, validate_group_key, validate_key, ) @@ -262,3 +263,31 @@ def assert_exactly_one(true=0, truthy=0, false=0, falsy=0): def test_exactly_one_should_fail(self): with pytest.raises(ValueError): exactly_one([True, False]) + + @pytest.mark.parametrize( + 'mode, expected', + [ + ( + 'strict', + { + 'b': '', + 'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, + 'd': ['', 0, '1'], + 'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']], + }, + ), + ( + 'truthy', + { + 'c': {'c': 'hi', 'd': ['1']}, + 'd': ['1'], + 'e': [{'c': 'hi', 'd': ['1']}, ['1']], + }, + ), + ], + ) + def test_prune_dict(self, mode, expected): + l1 = ['', 0, '1', None] + d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1} + d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]} + assert prune_dict(d2, mode=mode) == expected