Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better support for Boto Waiters #28236

Merged
merged 3 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions airflow/providers/amazon/aws/hooks/base_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import warnings
from copy import deepcopy
from functools import wraps
from os import PathLike
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar, Union

import boto3
Expand All @@ -43,6 +45,7 @@
from botocore.client import ClientMeta
from botocore.config import Config
from botocore.credentials import ReadOnlyCredentials
from botocore.waiter import Waiter, WaiterModel
from dateutil.tz import tzlocal
from slugify import slugify

Expand All @@ -51,6 +54,7 @@
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.utils.connection_wrapper import AwsConnectionWrapper
from airflow.providers.amazon.aws.waiters.base_waiter import BaseBotoWaiter
from airflow.providers_manager import ProvidersManager
from airflow.utils.helpers import exactly_one
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -764,6 +768,44 @@ def test_connection(self):
except Exception as e:
return False, str(f"{type(e).__name__!r} error occurred while testing connection: {e}")

@cached_property
def waiter_path(self) -> PathLike[str] | None:
path = Path(__file__).parents[1].joinpath(f"waiters/{self.client_type}.json").resolve()
return path if path.exists() else None

def get_waiter(self, waiter_name: str) -> Waiter:
"""
First checks if there is a custom waiter with the provided waiter_name and
uses that if it exists, otherwise it will check the service client for a
waiter that matches the name and pass that through.

:param waiter_name: The name of the waiter. The name should exactly match the
name of the key in the waiter model file (typically this is CamelCase).
"""
if self.waiter_path and (waiter_name in self._list_custom_waiters()):
# Technically if waiter_name is in custom_waiters then self.waiter_path must
# exist but MyPy doesn't like the fact that self.waiter_path could be None.
with open(self.waiter_path) as config_file:
config = json.load(config_file)
return BaseBotoWaiter(client=self.conn, model_config=config).waiter(waiter_name)
# If there is no custom waiter found for the provided name,
# then try checking the service's official waiters.
return self.conn.get_waiter(waiter_name)

def list_waiters(self) -> list[str]:
"""Returns a list containing the names of all waiters for the service, official and custom."""
return [*self._list_official_waiters(), *self._list_custom_waiters()]

def _list_official_waiters(self) -> list[str]:
return self.conn.waiter_names

def _list_custom_waiters(self) -> list[str]:
if not self.waiter_path:
return []
with open(self.waiter_path) as config_file:
model_config = json.load(config_file)
return WaiterModel(model_config).waiter_names


class AwsBaseHook(AwsGenericHook[Union[boto3.client, boto3.resource]]):
"""
Expand Down
141 changes: 84 additions & 57 deletions airflow/providers/amazon/aws/operators/eks.py

Large diffs are not rendered by default.

100 changes: 100 additions & 0 deletions airflow/providers/amazon/aws/waiters/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

This module is for custom Boto3 waiter configuration files. Since documentation
on creating custom waiters is pretty sparse out in the wild, this document can
act as a rough quickstart guide. It is not meant to cover all edge cases.

# To add a new custom waiter

## Create or modify the service waiter config file

Find or create a file for the service it is related to, for example waiters/eks.json

### In the service waiter config file

Build or add to the waiter model config json in that file. For examples of what these
should look like, have a look through some official waiter models. Some examples:

* [Cloudwatch](https://github.com/boto/botocore/blob/develop/botocore/data/cloudwatch/2010-08-01/waiters-2.json)
* [EC2](https://github.com/boto/botocore/blob/develop/botocore/data/ec2/2016-11-15/waiters-2.json)
* [EKS](https://github.com/boto/botocore/blob/develop/botocore/data/eks/2017-11-01/waiters-2.json)

Below is an example of a working waiter model config that will make an EKS waiter which will wait for
all Nodegroups in a cluster to be deleted. An explanation follows the code snippet. Note the backticks
to escape the integers in the "argument" values.

```json
{
"version": 2,
"waiters": {
"all_nodegroups_deleted": {
"operation": "ListNodegroups",
"delay": 30,
"maxAttempts": 60,
"acceptors": [
{
"matcher": "path",
"argument": "length(nodegroups[]) == `0`",
"expected": true,
"state": "success"
},
{
"matcher": "path",
"expected": true,
"argument": "length(nodegroups[]) > `0`",
"state": "retry"
}
]
}
}
}
```

In the model config above we create a new waiter called `all_nodegroups_deleted` which calls
the `ListNodegroups` API endpoint. The parameters for the endpoint call must be passed into
the `waiter.wait()` call, the same as when using an official waiter. The waiter then performs
"argument" (in this case `len(result) == 0`) on the result. If the argument returns the value
in "expected" (in this case `True`) then the waiter's state is set to `success`, the waiter can
close down, and the operator which called it can continue. If `len(result) > 0` is `True` then
the state is set to `retry`. The waiter will "delay" 30 seconds before trying again. If the
state does not go to `success` before the maxAttempts number of tries, the waiter raises a
WaiterException. Both `retry` and `maxAttempts` can be overridden by the user when calling
`waiter.wait()` like any other waiter.

### That's It!

The AwsBaseHook handles the rest. Using the above waiter will look like this:
`EksHook().get_waiter("all_nodegroups_deleted").wait(clusterName="my_cluster")`
and for testing purposes, a `list_custom_waiters()` helper method is proved which can
be used the same way: `EksHook().list_custom_waiters()`


### In your Operators (How to use these)

Once configured correctly, the custom waiter will be nearly indistinguishable from an official waiter.
Below is an example of an official waiter followed by a custom one.

```python
EksHook().conn.get_waiter("nodegroup_deleted").wait(clusterName=cluster_name, nodegroupName=nodegroup_name)
EksHook().get_waiter("all_nodegroups_deleted").wait(clusterName=cluster_name)
```

Note that since the get_waiter is in the hook instead of on the client side, a custom waiter is
just `hook.get_waiter` and not `hook.conn.get_waiter`. Other than that, they should be identical.
16 changes: 16 additions & 0 deletions airflow/providers/amazon/aws/waiters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
36 changes: 36 additions & 0 deletions airflow/providers/amazon/aws/waiters/base_waiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought BatchClient already implements some kind of waiters (I personally never use it in this hook)

  • airflow/providers/amazon/aws/hooks/batch_waiters.json
  • airflow/providers/amazon/aws/hooks/batch_waiters.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes? Looks like it. I've never used that one and it's not something you could inherit and use for other services.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean a good point one day also migrate batch waiters to generic solution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this would lay the framework for that and adding whatever other custom waiters that folks want to make. The more I look at it, the more I think it'll either simplify or replace a lot of Sensors as well, which wasn't the intended purpose but a very nice side effect.

# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import boto3
from botocore.waiter import Waiter, WaiterModel, create_waiter_with_client


class BaseBotoWaiter:
"""
Used to create custom Boto3 Waiters.

For more details, see airflow/providers/amazon/aws/waiters/README.md
"""

def __init__(self, client: boto3.client, model_config: dict) -> None:
self.model = WaiterModel(model_config)
self.client = client

def waiter(self, waiter_name: str) -> Waiter:
return create_waiter_with_client(waiter_name=waiter_name, waiter_model=self.model, client=self.client)
24 changes: 24 additions & 0 deletions airflow/providers/amazon/aws/waiters/eks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"version": 2,
"waiters": {
"all_nodegroups_deleted": {
"operation": "ListNodegroups",
"delay": 30,
"maxAttempts": 60,
"acceptors": [
{
"matcher": "path",
"argument": "length(nodegroups[]) == `0`",
"expected": true,
"state": "success"
},
{
"matcher": "path",
"expected": true,
"argument": "length(nodegroups[]) > `0`",
"state": "retry"
}
]
}
}
}
1 change: 1 addition & 0 deletions dev/provider_packages/MANIFEST_TEMPLATE.in.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

{% if PROVIDER_PACKAGE_ID == 'amazon' %}
include airflow/providers/amazon/aws/hooks/batch_waiters.json
include airflow/providers/amazon/aws/waiters/*.json
{% elif PROVIDER_PACKAGE_ID == 'google' %}
include airflow/providers/google/cloud/example_dags/*.yaml
include airflow/providers/google/cloud/example_dags/*.sql
Expand Down
Loading