Skip to content

Commit

Permalink
Merge branch 'apache:main' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
subkanthi authored Dec 31, 2021
2 parents 00fbfcb + 57ed3ab commit 0c7cc12
Show file tree
Hide file tree
Showing 40 changed files with 575 additions and 157 deletions.
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,13 @@ repos:
files: ^Dockerfile$
pass_filenames: false
additional_dependencies: ['rich']
- id: update-supported-versions
name: Updates supported versions in documentation
entry: ./scripts/ci/pre_commit/supported_versions.py
language: python
files: ^scripts/ci/pre_commit/supported_versions.py$|^README.md$|^docs/apache-airflow/supported-versions.rst$
pass_filenames: false
additional_dependencies: ['tabulate']
- id: update-version
name: Update version to the latest version in the documentation
entry: ./scripts/ci/pre_commit/pre_commit_update_versions.py
Expand Down
4 changes: 2 additions & 2 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2209,8 +2209,8 @@ This is the current syntax for `./breeze <./breeze>`_:
pyupgrade restrict-start_date rst-backticks setup-order setup-extra-packages
shellcheck sort-in-the-wild sort-spelling-wordlist stylelint trailing-whitespace
ui-lint update-breeze-file update-extras update-local-yml-file update-setup-cfg-file
update-versions vendor-k8s-json-schema verify-db-migrations-documented version-sync
www-lint yamllint yesqa
update-supported-versions update-versions vendor-k8s-json-schema
verify-db-migrations-documented version-sync www-lint yamllint yesqa
You can pass extra arguments including options to the pre-commit framework as
<EXTRA_ARGS> passed after --. For example:
Expand Down
19 changes: 12 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,18 @@ packages:

Apache Airflow version life cycle:

| Version | Current Patch/Minor | State | First Release | Limited Support | EOL/Terminated |
|---------|---------------------|-----------|---------------|-----------------|----------------|
| 2 | 2.2.3 | Supported | Dec 17, 2020 | TBD | TBD |
| 1.10 | 1.10.15 | EOL | Aug 27, 2018 | Dec 17, 2020 | June 17, 2021 |
| 1.9 | 1.9.0 | EOL | Jan 03, 2018 | Aug 27, 2018 | Aug 27, 2018 |
| 1.8 | 1.8.2 | EOL | Mar 19, 2017 | Jan 03, 2018 | Jan 03, 2018 |
| 1.7 | 1.7.1.2 | EOL | Mar 28, 2016 | Mar 19, 2017 | Mar 19, 2017 |
<!-- This table is automatically updated by pre-commit scripts/ci/pre-commit/supported_versions.py -->
<!-- Beginning of auto-generated table -->

| Version | Current Patch/Minor | State | First Release | Limited Support | EOL/Terminated |
|-----------|-----------------------|-----------|-----------------|-------------------|------------------|
| 2 | 2.2.3 | Supported | Dec 17, 2020 | TBD | TBD |
| 1.10 | 1.10.15 | EOL | Aug 27, 2018 | Dec 17, 2020 | June 17, 2021 |
| 1.9 | 1.9.0 | EOL | Jan 03, 2018 | Aug 27, 2018 | Aug 27, 2018 |
| 1.8 | 1.8.2 | EOL | Mar 19, 2017 | Jan 03, 2018 | Jan 03, 2018 |
| 1.7 | 1.7.1.2 | EOL | Mar 28, 2016 | Mar 19, 2017 | Mar 19, 2017 |

<!-- End of auto-generated table -->

Limited support versions will be supported with security and critical bug fix only.
EOL versions will not get any fixes nor support.
Expand Down
2 changes: 2 additions & 0 deletions STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ require Breeze Docker images to be installed locally.
------------------------------------ ---------------------------------------------------------------- ------------
``update-setup-cfg-file`` Update setup.cfg file with all licenses
------------------------------------ ---------------------------------------------------------------- ------------
``update-supported-versions`` Updates supported versions in documentation
------------------------------------ ---------------------------------------------------------------- ------------
``update-versions`` Updates latest versions in the documentation
------------------------------------ ---------------------------------------------------------------- ------------
``vendor-k8s-json-schema`` Vendor k8s schema definitions in the helm chart schema file
Expand Down
6 changes: 3 additions & 3 deletions airflow/kubernetes/pod_launcher_deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

warnings.warn(
"""
Please use :mod: Please use `airflow.providers.cncf.kubernetes.utils.pod_launcher`
Please use :mod: Please use `airflow.providers.cncf.kubernetes.utils.pod_manager`
To use this module install the provider package by installing this pip package:
Expand All @@ -62,7 +62,7 @@ class PodStatus:

class PodLauncher(LoggingMixin):
"""Deprecated class for launching pods. please use
airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher instead
airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead
"""

def __init__(
Expand All @@ -74,7 +74,7 @@ def __init__(
):
"""
Deprecated class for launching pods. please use
airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher instead
airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager instead
Creates the launcher.
:param kube_client: kubernetes client
Expand Down
19 changes: 19 additions & 0 deletions airflow/providers/amazon/aws/operators/eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,12 @@ class EksPodOperator(KubernetesPodOperator):
empty, then the default boto3 configuration would be used (and must be
maintained on each worker node).
:type aws_conn_id: str
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted. If True, delete the
pod; if False, leave the pod. Current default is False, but this will be
changed in the next major release of this provider.
:type is_delete_operator_pod: bool
"""

template_fields: Sequence[str] = tuple(
Expand All @@ -647,6 +653,7 @@ def __init__(
pod_username: Optional[str] = None,
aws_conn_id: str = DEFAULT_CONN_ID,
region: Optional[str] = None,
is_delete_operator_pod: Optional[bool] = None,
**kwargs,
) -> None:
if pod_name is None:
Expand All @@ -658,6 +665,17 @@ def __init__(
)
pod_name = DEFAULT_POD_NAME

if is_delete_operator_pod is None:
warnings.warn(
f"You have not set parameter `is_delete_operator_pod` in class {self.__class__.__name__}. "
"Currently the default for this parameter is `False` but in a future release the default "
"will be changed to `True`. To ensure pods are not deleted in the future you will need to "
"set `is_delete_operator_pod=False` explicitly.",
DeprecationWarning,
stacklevel=2,
)
is_delete_operator_pod = False

self.cluster_name = cluster_name
self.in_cluster = in_cluster
self.namespace = namespace
Expand All @@ -668,6 +686,7 @@ def __init__(
in_cluster=self.in_cluster,
namespace=self.namespace,
name=self.pod_name,
is_delete_operator_pod=is_delete_operator_pod,
**kwargs,
)
if pod_username:
Expand Down
9 changes: 5 additions & 4 deletions airflow/providers/amazon/aws/transfers/ftp_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ def __init__(
self.encrypt = encrypt
self.gzip = gzip
self.acl_policy = acl_policy
self.s3_hook = None
self.ftp_hook = None
self.s3_hook: Optional[S3Hook] = None
self.ftp_hook: Optional[FTPHook] = None

def __upload_to_s3_from_ftp(self, remote_filename, s3_file_key):
with NamedTemporaryFile() as local_tmp_file:
Expand Down Expand Up @@ -132,12 +132,13 @@ def execute(self, context: 'Context'):
if self.ftp_filenames == '*':
files = list_dir
else:
files = list(filter(lambda file: self.ftp_filenames in file, list_dir))
ftp_filename: str = self.ftp_filenames
files = list(filter(lambda f: ftp_filename in f, list_dir))

for file in files:
self.log.info(f'Moving file {file}')

if self.s3_filenames:
if self.s3_filenames and isinstance(self.s3_filenames, str):
filename = file.replace(self.ftp_filenames, self.s3_filenames)
else:
filename = file
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/drill/operators/drill.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(
self.sql = sql
self.drill_conn_id = drill_conn_id
self.parameters = parameters
self.hook = None
self.hook: Optional[DrillHook] = None

def execute(self, context: 'Context'):
self.log.info('Executing: %s on %s', self.sql, self.drill_conn_id)
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/apache/kylin/operators/kylin_cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ def execute(self, context: 'Context'):
_hook = KylinHook(kylin_conn_id=self.kylin_conn_id, project=self.project, dsn=self.dsn)

_support_invoke_command = kylinpy.CubeSource.support_invoke_command
if not self.command:
raise AirflowException(f'Kylin:Command {self.command} can not be empty')
if self.command.lower() not in _support_invoke_command:
raise AirflowException(
f'Kylin:Command {self.command} can not match kylin command list {_support_invoke_command}'
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/pig/operators/pig.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(
self.pig = pig
self.pig_cli_conn_id = pig_cli_conn_id
self.pig_opts = pig_opts
self.hook = None
self.hook: Optional[PigCliHook] = None

def prepare_template(self):
if self.pigparams_jinja_translate:
Expand Down
58 changes: 58 additions & 0 deletions airflow/providers/asana/operators/asana_tasks.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Optional

from airflow.models import BaseOperator

class AsanaCreateTaskOperator(BaseOperator):
def __init__(
self,
*,
name: str,
conn_id: Optional[str] = None,
task_parameters: Optional[dict] = None,
**kwargs,
) -> None: ...

class AsanaUpdateTaskOperator(BaseOperator):
def __init__(
self,
*,
conn_id: Optional[str] = None,
asana_task_gid: str,
task_parameters: dict,
**kwargs,
) -> None: ...

class AsanaDeleteTaskOperator(BaseOperator):
def __init__(
self,
*,
conn_id: Optional[str] = None,
asana_task_gid: str,
**kwargs,
) -> None: ...

class AsanaFindTaskOperator(BaseOperator):
def __init__(
self,
*,
conn_id: Optional[str] = None,
search_parameters: Optional[dict] = None,
**kwargs,
) -> None: ...
19 changes: 11 additions & 8 deletions airflow/providers/cncf/kubernetes/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ Breaking changes
~~~~~~~~~~~~~~~~

* ``Simplify KubernetesPodOperator (#19572)``
* Class ``pod_launcher.PodLauncher`` renamed to ``pod_manager.PodManager``
* :func:`airflow.settings.pod_mutation_hook` is no longer called in :meth:`~cncf.kubernetes.utils.pod_manager.PodManager.run_pod_async``. For ``KubernetesPodOperator``, mutation now occurs in ``build_pod_request_obj``.
* Parameter ``is_delete_operator_pod`` default is changed to ``True`` so that pods are deleted after task completion and not left to accumulate. In practice it seems more common to disable pod deletion only on a temporary basis for debugging purposes and therefore pod deletion is the more sensible default.

.. warning:: Many methods in :class:`~.KubernetesPodOperator` and class:`~.PodLauncher` have been renamed.
If you have subclassed :class:`~.KubernetesPodOperator` will need to update your subclass to reflect
the new structure. Additionally ``PodStatus`` enum has been renamed to ``PodPhase``.
.. warning:: Many methods in :class:`~.KubernetesPodOperator` and class:`~.PodManager` (formerly named ``PodLauncher``)
have been renamed. If you have subclassed :class:`~.KubernetesPodOperator` you will need to update your subclass to
reflect the new structure. Additionally, class ``PodStatus`` has been renamed to ``PodPhase``.

Notes on changes KubernetesPodOperator and PodLauncher
``````````````````````````````````````````````````````
Expand All @@ -51,15 +54,15 @@ into the top level of ``execute`` because it can be the same for "attached" pods

:meth:`~.KubernetesPodOperator.get_or_create_pod` tries first to find an existing pod using labels
specific to the task instance (see :meth:`~.KubernetesPodOperator.find_pod`).
If one does not exist it :meth:`creates a pod <~.PodLauncher.create_pod>`.
If one does not exist it :meth:`creates a pod <~.PodManager.create_pod>`.

The "waiting" part of execution has three components. The first step is to wait for the pod to leave the
``Pending`` phase (:meth:`~.KubernetesPodOperator.await_pod_start`). Next, if configured to do so,
the operator will :meth:`follow the base container logs <~.KubernetesPodOperator.await_pod_start>`
and forward these logs to the task logger until the ``base`` container is done. If not configured to harvest the
logs, the operator will instead :meth:`poll for container completion until done <~.KubernetesPodOperator.await_container_completion>`;
either way, we must await container completion before harvesting xcom. After (optionally) extracting the xcom
value from the base container, we :meth:`await pod completion <~.PodLauncher.await_pod_completion>`.
value from the base container, we :meth:`await pod completion <~.PodManager.await_pod_completion>`.

Previously, depending on whether the pod was "reattached to" (e.g. after a worker failure) or
created anew, the waiting logic may have occurred in either ``handle_pod_overlap`` or ``create_new_pod_for_operator``.
Expand All @@ -80,7 +83,7 @@ Details on method renames, refactors, and deletions

In ``KubernetesPodOperator``:

* Method ``create_pod_launcher`` is converted to cached property ``launcher``
* Method ``create_pod_launcher`` is converted to cached property ``pod_manager``
* Construction of k8s ``CoreV1Api`` client is now encapsulated within cached property ``client``
* Logic to search for an existing pod (e.g. after an airflow worker failure) is moved out of ``execute`` and into method ``find_pod``.
* Method ``handle_pod_overlap`` is removed. Previously it monitored a "found" pod until completion. With this change the pod monitoring (and log following) is orchestrated directly from ``execute`` and it is the same whether it's a "found" pod or a "new" pod. See methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``.
Expand All @@ -90,7 +93,7 @@ In ``KubernetesPodOperator``:
* Method ``_try_numbers_match`` is removed.
* Method ``create_new_pod_for_operator`` is removed. Previously it would mutate the labels on ``self.pod``, launch the pod, monitor the pod to completion etc. Now this logic is in part handled by ``get_or_create_pod``, where a new pod will be created if necessary. The monitoring etc is now orchestrated directly from ``execute``. Again, see the calls to methods ``await_pod_start``, ``follow_container_logs``, ``await_container_completion`` and ``await_pod_completion``.

In ``pod_launcher.py``, in class ``PodLauncher``:
In class ``PodManager`` (formerly ``PodLauncher``):

* Method ``start_pod`` is removed and split into two methods: ``create_pod`` and ``await_pod_start``.
* Method ``monitor_pod`` is removed and split into methods ``follow_container_logs``, ``await_container_completion``, ``await_pod_completion``
Expand All @@ -99,7 +102,7 @@ In ``pod_launcher.py``, in class ``PodLauncher``:
* Method ``read_pod_logs`` now takes kwarg ``container_name``


Other changes in ``pod_launcher.py``:
Other changes in ``pod_manager.py`` (formerly ``pod_launcher.py``):

* Enum-like class ``PodStatus`` is renamed ``PodPhase``, and the values are no longer lower-cased.

Expand Down
Loading

0 comments on commit 0c7cc12

Please sign in to comment.