Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dry_run for KubernetesPodOperator #20573

Merged
merged 1 commit into from
Dec 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,15 @@ def build_pod_request_obj(self, context=None):
)
return pod

def dry_run(self) -> None:
"""
Prints out the pod definition that would be created by this operator.
Does not include labels specific to the task instance (since there isn't
one in a dry_run) and excludes all empty elements.
"""
pod = self.build_pod_request_obj()
dstandish marked this conversation as resolved.
Show resolved Hide resolved
print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict')))


class _suppress(AbstractContextManager):
"""
Expand All @@ -600,3 +609,53 @@ def __exit__(self, exctype, excinst, exctb):
logger = logging.getLogger()
logger.error(str(excinst), exc_info=True)
return caught_error


def _prune_dict(val: Any, mode='strict'):
"""
Note: this is duplicated from ``airflow.utils.helpers.prune_dict``. That one should
be the one used if possible, but this one is included to avoid having to
bump min airflow version. This function will be removed once the min airflow version
is bumped to 2.3.

Given dict ``val``, returns new dict based on ``val`` with all
empty elements removed.

What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
will be removed if ``bool(x) is False``.
"""

def is_empty(x):
if mode == 'strict':
return x is None
elif mode == 'truthy':
return bool(x) is False
raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")

if isinstance(val, dict):
new_dict = {}
for k, v in val.items():
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = _prune_dict(v, mode=mode)
if new_val:
new_dict[k] = new_val
else:
new_dict[k] = v
return new_dict
elif isinstance(val, list):
new_list = []
for v in val:
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = _prune_dict(v, mode=mode)
if new_val:
new_list.append(new_val)
else:
new_list.append(v)
return new_list
else:
return val
45 changes: 45 additions & 0 deletions airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,48 @@ def exactly_one(*args) -> bool:
"Not supported for iterable args. Use `*` to unpack your iterable in the function call."
)
return sum(map(bool, args)) == 1


def prune_dict(val: Any, mode='strict'):
"""
Given dict ``val``, returns new dict based on ``val`` with all
empty elements removed.

What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
will be removed if ``bool(x) is False``.
"""

def is_empty(x):
if mode == 'strict':
return x is None
elif mode == 'truthy':
return bool(x) is False
raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")

if isinstance(val, dict):
new_dict = {}
for k, v in val.items():
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = prune_dict(v, mode=mode)
if new_val:
new_dict[k] = new_val
else:
new_dict[k] = v
return new_dict
elif isinstance(val, list):
new_list = []
for v in val:
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = prune_dict(v, mode=mode)
if new_val:
new_list.append(new_val)
else:
new_list.append(v)
return new_list
else:
return val
25 changes: 25 additions & 0 deletions docs/apache-airflow-providers-cncf-kubernetes/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,31 @@ dependencies that are not available through the public PyPI repository. It also
YAML file using the ``pod_template_file`` parameter.
Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.

Debugging KubernetesPodOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You can print out the Kubernetes manifest for the pod that would be created at runtime by calling
:meth:`~.KubernetesPodOperator.dry_run` on an instance of the operator.

.. code-block:: python

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)

k = KubernetesPodOperator(
name="hello-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
)

k.dry_run()


How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
35 changes: 34 additions & 1 deletion tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
from airflow.models import DAG, DagRun, TaskInstance
from airflow.models.xcom import IN_MEMORY_DAGRUN_ID
from airflow.operators.dummy import DummyOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
_prune_dict,
_suppress,
)
from airflow.utils import timezone

DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0)
Expand Down Expand Up @@ -834,3 +838,32 @@ def test__suppress():
raise ValueError("failure")

mock_error.assert_called_once_with("failure", exc_info=True)


@pytest.mark.parametrize(
'mode, expected',
[
(
'strict',
{
'b': '',
'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']},
'd': ['', 0, '1'],
'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']],
},
),
(
'truthy',
{
'c': {'c': 'hi', 'd': ['1']},
'd': ['1'],
'e': [{'c': 'hi', 'd': ['1']}, ['1']],
},
),
],
)
def test__prune_dict(mode, expected):
l1 = ['', 0, '1', None]
d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1}
d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]}
assert _prune_dict(d2, mode=mode) == expected
29 changes: 29 additions & 0 deletions tests/utils/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
build_airflow_url_with_query,
exactly_one,
merge_dicts,
prune_dict,
validate_group_key,
validate_key,
)
Expand Down Expand Up @@ -262,3 +263,31 @@ def assert_exactly_one(true=0, truthy=0, false=0, falsy=0):
def test_exactly_one_should_fail(self):
with pytest.raises(ValueError):
exactly_one([True, False])

@pytest.mark.parametrize(
'mode, expected',
[
(
'strict',
{
'b': '',
'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']},
'd': ['', 0, '1'],
'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']],
},
),
(
'truthy',
{
'c': {'c': 'hi', 'd': ['1']},
'd': ['1'],
'e': [{'c': 'hi', 'd': ['1']}, ['1']],
},
),
],
)
def test_prune_dict(self, mode, expected):
l1 = ['', 0, '1', None]
d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1}
d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]}
assert prune_dict(d2, mode=mode) == expected