Skip to content

Commit

Permalink
Implement dry_run for KubernetesPodOperator (#20573)
Browse files Browse the repository at this point in the history
Calling task.dry_run() will print out the kubectl manifest for the pod that would be created (excluding labels that are derived from the task instance context).
  • Loading branch information
dstandish authored Dec 30, 2021
1 parent e634175 commit d56ff76
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 1 deletion.
59 changes: 59 additions & 0 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,15 @@ def build_pod_request_obj(self, context=None):
pod_mutation_hook(pod)
return pod

def dry_run(self) -> None:
"""
Prints out the pod definition that would be created by this operator.
Does not include labels specific to the task instance (since there isn't
one in a dry_run) and excludes all empty elements.
"""
pod = self.build_pod_request_obj()
print(yaml.dump(_prune_dict(pod.to_dict(), mode='strict')))


class _suppress(AbstractContextManager):
"""
Expand All @@ -602,3 +611,53 @@ def __exit__(self, exctype, excinst, exctb):
logger = logging.getLogger()
logger.error(str(excinst), exc_info=True)
return caught_error


def _prune_dict(val: Any, mode='strict'):
"""
Note: this is duplicated from ``airflow.utils.helpers.prune_dict``. That one should
be the one used if possible, but this one is included to avoid having to
bump min airflow version. This function will be removed once the min airflow version
is bumped to 2.3.
Given dict ``val``, returns new dict based on ``val`` with all
empty elements removed.
What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
will be removed if ``bool(x) is False``.
"""

def is_empty(x):
if mode == 'strict':
return x is None
elif mode == 'truthy':
return bool(x) is False
raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")

if isinstance(val, dict):
new_dict = {}
for k, v in val.items():
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = _prune_dict(v, mode=mode)
if new_val:
new_dict[k] = new_val
else:
new_dict[k] = v
return new_dict
elif isinstance(val, list):
new_list = []
for v in val:
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = _prune_dict(v, mode=mode)
if new_val:
new_list.append(new_val)
else:
new_list.append(v)
return new_list
else:
return val
45 changes: 45 additions & 0 deletions airflow/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,48 @@ def exactly_one(*args) -> bool:
"Not supported for iterable args. Use `*` to unpack your iterable in the function call."
)
return sum(map(bool, args)) == 1


def prune_dict(val: Any, mode='strict'):
"""
Given dict ``val``, returns new dict based on ``val`` with all
empty elements removed.
What constitutes "empty" is controlled by the ``mode`` parameter. If mode is 'strict'
then only ``None`` elements will be removed. If mode is ``truthy``, then element ``x``
will be removed if ``bool(x) is False``.
"""

def is_empty(x):
if mode == 'strict':
return x is None
elif mode == 'truthy':
return bool(x) is False
raise ValueError("allowable values for `mode` include 'truthy' and 'strict'")

if isinstance(val, dict):
new_dict = {}
for k, v in val.items():
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = prune_dict(v, mode=mode)
if new_val:
new_dict[k] = new_val
else:
new_dict[k] = v
return new_dict
elif isinstance(val, list):
new_list = []
for v in val:
if is_empty(v):
continue
elif isinstance(v, (list, dict)):
new_val = prune_dict(v, mode=mode)
if new_val:
new_list.append(new_val)
else:
new_list.append(v)
return new_list
else:
return val
25 changes: 25 additions & 0 deletions docs/apache-airflow-providers-cncf-kubernetes/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,31 @@ dependencies that are not available through the public PyPI repository. It also
YAML file using the ``pod_template_file`` parameter.
Ultimately, it allows Airflow to act a job orchestrator - no matter the language those jobs are written in.

Debugging KubernetesPodOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You can print out the Kubernetes manifest for the pod that would be created at runtime by calling
:meth:`~.KubernetesPodOperator.dry_run` on an instance of the operator.

.. code-block:: python
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
k = KubernetesPodOperator(
name="hello-dry-run",
image="debian",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
task_id="dry_run_demo",
do_xcom_push=True,
)
k.dry_run()
How to use cluster ConfigMaps, Secrets, and Volumes with Pod?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
35 changes: 34 additions & 1 deletion tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
from airflow.models import DAG, DagRun, TaskInstance
from airflow.models.xcom import IN_MEMORY_DAGRUN_ID
from airflow.operators.dummy import DummyOperator
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator, _suppress
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
_prune_dict,
_suppress,
)
from airflow.utils import timezone

DEFAULT_DATE = timezone.datetime(2016, 1, 1, 1, 0, 0)
Expand Down Expand Up @@ -834,3 +838,32 @@ def test__suppress():
raise ValueError("failure")

mock_error.assert_called_once_with("failure", exc_info=True)


@pytest.mark.parametrize(
'mode, expected',
[
(
'strict',
{
'b': '',
'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']},
'd': ['', 0, '1'],
'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']],
},
),
(
'truthy',
{
'c': {'c': 'hi', 'd': ['1']},
'd': ['1'],
'e': [{'c': 'hi', 'd': ['1']}, ['1']],
},
),
],
)
def test__prune_dict(mode, expected):
l1 = ['', 0, '1', None]
d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1}
d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]}
assert _prune_dict(d2, mode=mode) == expected
29 changes: 29 additions & 0 deletions tests/utils/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
build_airflow_url_with_query,
exactly_one,
merge_dicts,
prune_dict,
validate_group_key,
validate_key,
)
Expand Down Expand Up @@ -262,3 +263,31 @@ def assert_exactly_one(true=0, truthy=0, false=0, falsy=0):
def test_exactly_one_should_fail(self):
with pytest.raises(ValueError):
exactly_one([True, False])

@pytest.mark.parametrize(
'mode, expected',
[
(
'strict',
{
'b': '',
'c': {'b': '', 'c': 'hi', 'd': ['', 0, '1']},
'd': ['', 0, '1'],
'e': ['', 0, {'b': '', 'c': 'hi', 'd': ['', 0, '1']}, ['', 0, '1'], ['']],
},
),
(
'truthy',
{
'c': {'c': 'hi', 'd': ['1']},
'd': ['1'],
'e': [{'c': 'hi', 'd': ['1']}, ['1']],
},
),
],
)
def test_prune_dict(self, mode, expected):
l1 = ['', 0, '1', None]
d1 = {'a': None, 'b': '', 'c': 'hi', 'd': l1}
d2 = {'a': None, 'b': '', 'c': d1, 'd': l1, 'e': [None, '', 0, d1, l1, ['']]}
assert prune_dict(d2, mode=mode) == expected

0 comments on commit d56ff76

Please sign in to comment.