From a1c532eacfeddcbefaa3e565a0522e25315286c4 Mon Sep 17 00:00:00 2001 From: subkanthi Date: Mon, 22 Nov 2021 13:36:39 -0500 Subject: [PATCH 01/27] [16185] Added LocalKubernetesExecutor to breeze supported executors --- BREEZE.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/BREEZE.rst b/BREEZE.rst index a62a9734cd3c1..b6a14240ffd33 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -2422,6 +2422,7 @@ This is the current syntax for `./breeze <./breeze>`_: One of: KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor + LocalKubernetesExecutor Default: KubernetesExecutor From d16a343c1478eb068e01d7ef363d91bb704918f8 Mon Sep 17 00:00:00 2001 From: subkanthi Date: Mon, 22 Nov 2021 13:38:51 -0500 Subject: [PATCH 02/27] Revert "[16185] Added LocalKubernetesExecutor to breeze supported executors" This reverts commit a1c532eacfeddcbefaa3e565a0522e25315286c4. --- BREEZE.rst | 1 - 1 file changed, 1 deletion(-) diff --git a/BREEZE.rst b/BREEZE.rst index b6a14240ffd33..a62a9734cd3c1 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -2422,7 +2422,6 @@ This is the current syntax for `./breeze <./breeze>`_: One of: KubernetesExecutor CeleryExecutor LocalExecutor CeleryKubernetesExecutor - LocalKubernetesExecutor Default: KubernetesExecutor From 68c3443008af936df384244fb6e7a249d07b1458 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 18 Jan 2022 09:15:35 -0500 Subject: [PATCH 03/27] Added logic for kubernetes decorator --- airflow/decorators/__init__.py | 3 +- airflow/example_dags/example_kubernetes.py | 170 ++++++++++++++++++ .../cncf/kubernetes/decorators/__init__.py | 17 ++ .../cncf/kubernetes/decorators/kubernetes.py | 162 +++++++++++++++++ .../kubernetes/operators/kubernetes_pod.py | 1 + 5 files changed, 352 insertions(+), 1 deletion(-) create mode 100644 airflow/example_dags/example_kubernetes.py create mode 100644 airflow/providers/cncf/kubernetes/decorators/__init__.py create mode 100644 airflow/providers/cncf/kubernetes/decorators/kubernetes.py diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index 47a20d47826ef..f791848bb4aa1 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -21,10 +21,11 @@ from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin from airflow.decorators.task_group import task_group # noqa from airflow.models.dag import dag # noqa +from airflow.providers.cncf.kubernetes.decorators.kubernetes import KubernetesDecoratorMixin from airflow.providers_manager import ProvidersManager -class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin): +class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin, KubernetesDecoratorMixin): def __getattr__(self, name): if name.startswith("__"): raise AttributeError(f'{type(self).__name__} has no attribute {name!r}') diff --git a/airflow/example_dags/example_kubernetes.py b/airflow/example_dags/example_kubernetes.py new file mode 100644 index 0000000000000..ff914d61a4a86 --- /dev/null +++ b/airflow/example_dags/example_kubernetes.py @@ -0,0 +1,170 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This is an example dag for using the KubernetesPodOperator. +""" + +from datetime import datetime + +from kubernetes.client import models as k8s + +from airflow import DAG +from airflow.decorators import dag, task +from airflow.kubernetes.secret import Secret +from airflow.operators.bash import BashOperator + +# [START howto_operator_k8s_cluster_resources] +secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn') +secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn') +secret_all_keys = Secret('env', None, 'airflow-secrets-2') +volume_mount = k8s.V1VolumeMount( + name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True +) + +configmaps = [ + k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-1')), + k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-2')), +] + +volume = k8s.V1Volume( + name='test-volume', + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'), +) + +port = k8s.V1ContainerPort(name='http', container_port=80) + +init_container_volume_mounts = [ + k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True) +] + +init_environments = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')] + +init_container = k8s.V1Container( + name="init-container", + image="ubuntu:16.04", + env=init_environments, + volume_mounts=init_container_volume_mounts, + command=["bash", "-cx"], + args=["echo 10"], +) + +affinity = k8s.V1Affinity( + node_affinity=k8s.V1NodeAffinity( + preferred_during_scheduling_ignored_during_execution=[ + k8s.V1PreferredSchedulingTerm( + weight=1, + preference=k8s.V1NodeSelectorTerm( + match_expressions=[ + k8s.V1NodeSelectorRequirement(key="disktype", operator="in", values=["ssd"]) + ] + ), + ) + ] + ), + pod_affinity=k8s.V1PodAffinity( + required_during_scheduling_ignored_during_execution=[ + k8s.V1WeightedPodAffinityTerm( + weight=1, + pod_affinity_term=k8s.V1PodAffinityTerm( + label_selector=k8s.V1LabelSelector( + match_expressions=[ + k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1") + ] + ), + topology_key="failure-domain.beta.kubernetes.io/zone", + ), + ) + ] + ), +) + +tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")] + +# [END howto_operator_k8s_cluster_resources] + + +with DAG( + dag_id='example_kubernetes_operator_3', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=['example'], +) as dag: + k = KubernetesPodOperator( + namespace='default', + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo", "10"], + labels={"foo": "bar"}, + # secrets=[secret_file, secret_env, secret_all_keys], + ports=[port], + # volumes=[volume], + # volume_mounts=[volume_mount], + # env_from=configmaps, + name="airflow-test-pod", + task_id="task", + # affinity=affinity, + is_delete_operator_pod=True, + hostnetwork=False, + tolerations=tolerations, + # init_containers=[init_container], + priority_class_name="medium", + ) + + @task.kubernetes(image='python:3.9-slim-buster', name='k8s_test', namespace='default') + def execute_in_k8s_pod(): + print("Hello from k8s pod") + + execute_in_k8s_pod_instance = execute_in_k8s_pod() + k >> execute_in_k8s_pod_instance + + # [START howto_operator_k8s_private_image] + quay_k8s = KubernetesPodOperator( + namespace='default', + image='quay.io/apache/bash', + image_pull_secrets=[k8s.V1LocalObjectReference('testquay')], + cmds=["bash", "-cx"], + arguments=["echo", "10", "echo pwd"], + labels={"foo": "bar"}, + name="airflow-private-image-pod", + is_delete_operator_pod=True, + in_cluster=True, + task_id="task-two", + get_logs=True, + ) + # [END howto_operator_k8s_private_image] + + # [START howto_operator_k8s_write_xcom] + write_xcom = KubernetesPodOperator( + namespace='default', + image='alpine', + cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"], + name="write-xcom", + do_xcom_push=True, + is_delete_operator_pod=True, + in_cluster=True, + task_id="write-xcom", + get_logs=True, + ) + + pod_task_xcom_result = BashOperator( + bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", + task_id="pod_task_xcom_result", + ) + # [END howto_operator_k8s_write_xcom] + + # write_xcom >> pod_task_xcom_result diff --git a/airflow/providers/cncf/kubernetes/decorators/__init__.py b/airflow/providers/cncf/kubernetes/decorators/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/airflow/providers/cncf/kubernetes/decorators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py new file mode 100644 index 0000000000000..66f640600330e --- /dev/null +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -0,0 +1,162 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import base64 +import inspect +import os +import pickle +from tempfile import TemporaryDirectory +from textwrap import dedent +from typing import TYPE_CHECKING, Callable, Optional, Sequence, TypeVar + +import dill + +from airflow.decorators.base import DecoratedOperator, task_decorator_factory +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +def _generate_decode_command(env_var, file): + # We don't need `f.close()` as the interpreter is about to exit anyway + return ( + f'python -c "import base64, os;' + rf'x = base64.b64decode(os.environ[\"{env_var}\"]);' + rf'f = open(\"{file}\", \"wb\"); f.write(x);"' + ) + + +def _b64_encode_file(filename): + with open(filename, "rb") as file_to_encode: + return base64.b64encode(file_to_encode.read()) + + +class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator): + """ + Wraps a Python callable and captures args/kwargs when called for execution. + + :param python_callable: A reference to an object that is callable + :type python_callable: python callable + :param op_kwargs: a dictionary of keyword arguments that will get unpacked + in your function (templated) + :type op_kwargs: dict + :param op_args: a list of positional arguments that will get unpacked when + calling your callable (templated) + :type op_args: list + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. + Defaults to False. + :type multiple_outputs: bool + """ + + template_fields: Sequence[str] = ('op_args', 'op_kwargs') + + # since we won't mutate the arguments, we should just do the shallow copy + # there are some cases we can't deepcopy the objects (e.g protobuf). + shallow_copy_attrs: Sequence[str] = ('python_callable',) + + def __init__( + self, + use_dill=False, + **kwargs, + ) -> None: + print("KUBERNETES EXECUTOR" + str(kwargs)) + command = "dummy command" + self.pickling_library = dill if use_dill else pickle + super().__init__(**kwargs) + + def execute(self, context: 'Context'): + + with TemporaryDirectory(prefix='venv') as tmp_dir: + input_filename = os.path.join(tmp_dir, 'script.in') + script_filename = os.path.join(tmp_dir, 'script.py') + + with open(input_filename, 'wb') as file: + if self.op_args or self.op_kwargs: + self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file) + py_source = self._get_python_source() + print("KUBERNETES DECORATOR" + py_source) + write_python_script( + jinja_context=dict( + op_args=self.op_args, + op_kwargs=self.op_kwargs, + pickling_library=self.pickling_library.__name__, + python_callable=self.python_callable.__name__, + python_callable_source=py_source, + string_args_global=False, + ), + filename=script_filename, + ) + + # Pass the python script to be executed, and the input args, via environment variables + # and its injected to the pod as a config map + # self.environment["__PYTHON_SCRIPT"] = _b64_encode_file(script_filename) + # if self.op_args or self.op_kwargs: + # self.environment["__PYTHON_INPUT"] = _b64_encode_file(input_filename) + # else: + # self.environment["__PYTHON_INPUT"] = "" + + self.command = ( + f"""bash -cx '{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} &&""" + f'{_generate_decode_command("__PYTHON_INPUT", "/tmp/script.in")} &&' + f'python /tmp/script.py /tmp/script.in /tmp/script.out\'' + ) + + print("KUBERNETES COMMAND" + self.command) + return super().execute(context) + + def _get_python_source(self): + raw_source = inspect.getsource(self.python_callable) + res = dedent(raw_source) + res = remove_task_decorator(res, "@task.kubernetes") + return res + + +T = TypeVar("T", bound=Callable) + + +class KubernetesDecoratorMixin: + """ + Mixin class used in provider managers to choose the right class + based on the decorator + + :meta private: + """ + + def kubernetes( + self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs + ): + """ + Python operator decorator. Wraps a function into an Airflow operator. + Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :type python_callable: Optional[Callable] + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. List/Tuples will unroll to xcom values + with index as key. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + :type multiple_outputs: bool + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_KubernetesDecoratedOperator, + **kwargs, + ) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 1ae72fb31acb3..0044146c7b7f0 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -401,6 +401,7 @@ def extract_xcom(self, pod): return json.loads(result) def execute(self, context: 'Context'): + print("KUBERNETES POD OPERATOR" + str(context)) remote_pod = None try: self.pod_request_obj = self.build_pod_request_obj(context) From 402ccaf0a2add722cd5ce614862d93e47c6b6a46 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 18 Jan 2022 16:49:56 -0500 Subject: [PATCH 04/27] Merged from master --- airflow/decorators/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index 4e75f8fb1e762..e3c9c3a944f72 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -23,12 +23,14 @@ from airflow.decorators.task_group import task_group # noqa from airflow.models.dag import dag # noqa from airflow.providers.cncf.kubernetes.decorators.kubernetes import KubernetesDecoratorMixin + ======= from airflow.decorators.base import TaskDecorator from airflow.decorators.python import python_task from airflow.decorators.python_virtualenv import virtualenv_task from airflow.decorators.task_group import task_group from airflow.models.dag import dag + >>>>>>> 41a420c1481b862559c92bc9286d8cdb7d436972 from airflow.providers_manager import ProvidersManager From e778325d0ea4973c079ba4b06f9dcea5c2bab036 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Tue, 18 Jan 2022 19:43:19 -0500 Subject: [PATCH 05/27] Changes related to decorator composition --- airflow/decorators/__init__.py | 23 +++------- airflow/decorators/__init__.pyi | 18 ++++++++ .../cncf/kubernetes/decorators/kubernetes.py | 45 ++++++++----------- 3 files changed, 41 insertions(+), 45 deletions(-) diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index e3c9c3a944f72..dcae301e81cf0 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -15,26 +15,16 @@ # specific language governing permissions and limitations # under the License. -<<<<<<< HEAD -from typing import TYPE_CHECKING - -from airflow.decorators.python import PythonDecoratorMixin, python_task # noqa -from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin -from airflow.decorators.task_group import task_group # noqa -from airflow.models.dag import dag # noqa -from airflow.providers.cncf.kubernetes.decorators.kubernetes import KubernetesDecoratorMixin - -======= from airflow.decorators.base import TaskDecorator +from airflow.decorators.branch_python import branch_task from airflow.decorators.python import python_task from airflow.decorators.python_virtualenv import virtualenv_task from airflow.decorators.task_group import task_group from airflow.models.dag import dag - ->>>>>>> 41a420c1481b862559c92bc9286d8cdb7d436972 +from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task from airflow.providers_manager import ProvidersManager -__all__ = ["dag", "task", "task_group", "python_task", "virtualenv_task"] +__all__ = ["dag", "task", "task_group", "python_task", "virtualenv_task", "branch_task", "kubernetes_task"] class _TaskDecoratorFactory: @@ -42,16 +32,13 @@ class _TaskDecoratorFactory: python = staticmethod(python_task) virtualenv = staticmethod(virtualenv_task) + branch = staticmethod(branch_task) + kubernetes = staticmethod(kubernetes_task) -<<<<<<< HEAD -class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin, KubernetesDecoratorMixin): - def __getattr__(self, name): -======= __call__ = python # Alias '@task' to '@task.python'. def __getattr__(self, name: str) -> TaskDecorator: """Dynamically get provider-registered task decorators, e.g. ``@task.docker``.""" ->>>>>>> 41a420c1481b862559c92bc9286d8cdb7d436972 if name.startswith("__"): raise AttributeError(f'{type(self).__name__} has no attribute {name!r}') decorators = ProvidersManager().taskflow_decorators diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index 00b79c977252e..db0884c487917 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -141,6 +141,24 @@ class TaskDecoratorFactory: """ @overload def virtualenv(self, python_callable: F) -> F: ... + @overload + def kubernetes( + python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs + ) -> TaskDecorator: + """ + Python operator decorator. Wraps a function into an Airflow operator. + Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :type python_callable: Optional[Callable] + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. List/Tuples will unroll to xcom values + with index as key. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + :type multiple_outputs: bool + """ + @overload + def kubernetes(self, python_callable: F) -> F: ... # [START decorator_signature] @overload def docker( diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 66f640600330e..059ad3e29cde5 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -82,7 +82,6 @@ def __init__( super().__init__(**kwargs) def execute(self, context: 'Context'): - with TemporaryDirectory(prefix='venv') as tmp_dir: input_filename = os.path.join(tmp_dir, 'script.in') script_filename = os.path.join(tmp_dir, 'script.py') @@ -131,32 +130,24 @@ def _get_python_source(self): T = TypeVar("T", bound=Callable) -class KubernetesDecoratorMixin: +def kubernetes_task( + python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs +): """ - Mixin class used in provider managers to choose the right class - based on the decorator + Python operator decorator. Wraps a function into an Airflow operator. + Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. - :meta private: + :param python_callable: Function to decorate + :type python_callable: Optional[Callable] + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. List/Tuples will unroll to xcom values + with index as key. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + :type multiple_outputs: bool """ - - def kubernetes( - self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs - ): - """ - Python operator decorator. Wraps a function into an Airflow operator. - Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. - - :param python_callable: Function to decorate - :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. - :type multiple_outputs: bool - """ - return task_decorator_factory( - python_callable=python_callable, - multiple_outputs=multiple_outputs, - decorated_operator_class=_KubernetesDecoratedOperator, - **kwargs, - ) + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_KubernetesDecoratedOperator, + **kwargs, + ) From 9ef371f1472ba10dc6cee7ba1077488f81bc7faa Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Wed, 19 Jan 2022 19:47:11 -0500 Subject: [PATCH 06/27] Added kubernetes executor --- airflow/decorators/__init__.py | 4 +--- airflow/example_dags/example_kubernetes.py | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index dcae301e81cf0..3ce03be618466 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -16,7 +16,6 @@ # under the License. from airflow.decorators.base import TaskDecorator -from airflow.decorators.branch_python import branch_task from airflow.decorators.python import python_task from airflow.decorators.python_virtualenv import virtualenv_task from airflow.decorators.task_group import task_group @@ -24,7 +23,7 @@ from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task from airflow.providers_manager import ProvidersManager -__all__ = ["dag", "task", "task_group", "python_task", "virtualenv_task", "branch_task", "kubernetes_task"] +__all__ = ["dag", "task", "task_group", "python_task", "virtualenv_task", "kubernetes_task"] class _TaskDecoratorFactory: @@ -32,7 +31,6 @@ class _TaskDecoratorFactory: python = staticmethod(python_task) virtualenv = staticmethod(virtualenv_task) - branch = staticmethod(branch_task) kubernetes = staticmethod(kubernetes_task) __call__ = python # Alias '@task' to '@task.python'. diff --git a/airflow/example_dags/example_kubernetes.py b/airflow/example_dags/example_kubernetes.py index ff914d61a4a86..67ad1363d5fb8 100644 --- a/airflow/example_dags/example_kubernetes.py +++ b/airflow/example_dags/example_kubernetes.py @@ -27,6 +27,7 @@ from airflow.decorators import dag, task from airflow.kubernetes.secret import Secret from airflow.operators.bash import BashOperator +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator # [START howto_operator_k8s_cluster_resources] secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn') From 857700f1cd5eed63885b4435c39a8e03fafb557b Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sat, 22 Jan 2022 20:07:33 -0500 Subject: [PATCH 07/27] Added jinja template for kubernetes decorator, fixed logic of passing source code as env variables --- airflow/example_dags/example_kubernetes.py | 171 --------------- .../example_kubernetes_decorator.py | 61 ++++++ .../cncf/kubernetes/decorators/kubernetes.py | 205 ++++++++++++++---- .../kubernetes/operators/kubernetes_pod.py | 1 - airflow/utils/python_kubernetes_script.jinja2 | 44 ++++ 5 files changed, 269 insertions(+), 213 deletions(-) delete mode 100644 airflow/example_dags/example_kubernetes.py create mode 100644 airflow/example_dags/example_kubernetes_decorator.py create mode 100644 airflow/utils/python_kubernetes_script.jinja2 diff --git a/airflow/example_dags/example_kubernetes.py b/airflow/example_dags/example_kubernetes.py deleted file mode 100644 index 67ad1363d5fb8..0000000000000 --- a/airflow/example_dags/example_kubernetes.py +++ /dev/null @@ -1,171 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -""" -This is an example dag for using the KubernetesPodOperator. -""" - -from datetime import datetime - -from kubernetes.client import models as k8s - -from airflow import DAG -from airflow.decorators import dag, task -from airflow.kubernetes.secret import Secret -from airflow.operators.bash import BashOperator -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator - -# [START howto_operator_k8s_cluster_resources] -secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn') -secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn') -secret_all_keys = Secret('env', None, 'airflow-secrets-2') -volume_mount = k8s.V1VolumeMount( - name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True -) - -configmaps = [ - k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-1')), - k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='test-configmap-2')), -] - -volume = k8s.V1Volume( - name='test-volume', - persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'), -) - -port = k8s.V1ContainerPort(name='http', container_port=80) - -init_container_volume_mounts = [ - k8s.V1VolumeMount(mount_path='/etc/foo', name='test-volume', sub_path=None, read_only=True) -] - -init_environments = [k8s.V1EnvVar(name='key1', value='value1'), k8s.V1EnvVar(name='key2', value='value2')] - -init_container = k8s.V1Container( - name="init-container", - image="ubuntu:16.04", - env=init_environments, - volume_mounts=init_container_volume_mounts, - command=["bash", "-cx"], - args=["echo 10"], -) - -affinity = k8s.V1Affinity( - node_affinity=k8s.V1NodeAffinity( - preferred_during_scheduling_ignored_during_execution=[ - k8s.V1PreferredSchedulingTerm( - weight=1, - preference=k8s.V1NodeSelectorTerm( - match_expressions=[ - k8s.V1NodeSelectorRequirement(key="disktype", operator="in", values=["ssd"]) - ] - ), - ) - ] - ), - pod_affinity=k8s.V1PodAffinity( - required_during_scheduling_ignored_during_execution=[ - k8s.V1WeightedPodAffinityTerm( - weight=1, - pod_affinity_term=k8s.V1PodAffinityTerm( - label_selector=k8s.V1LabelSelector( - match_expressions=[ - k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1") - ] - ), - topology_key="failure-domain.beta.kubernetes.io/zone", - ), - ) - ] - ), -) - -tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")] - -# [END howto_operator_k8s_cluster_resources] - - -with DAG( - dag_id='example_kubernetes_operator_3', - schedule_interval=None, - start_date=datetime(2021, 1, 1), - tags=['example'], -) as dag: - k = KubernetesPodOperator( - namespace='default', - image="ubuntu:16.04", - cmds=["bash", "-cx"], - arguments=["echo", "10"], - labels={"foo": "bar"}, - # secrets=[secret_file, secret_env, secret_all_keys], - ports=[port], - # volumes=[volume], - # volume_mounts=[volume_mount], - # env_from=configmaps, - name="airflow-test-pod", - task_id="task", - # affinity=affinity, - is_delete_operator_pod=True, - hostnetwork=False, - tolerations=tolerations, - # init_containers=[init_container], - priority_class_name="medium", - ) - - @task.kubernetes(image='python:3.9-slim-buster', name='k8s_test', namespace='default') - def execute_in_k8s_pod(): - print("Hello from k8s pod") - - execute_in_k8s_pod_instance = execute_in_k8s_pod() - k >> execute_in_k8s_pod_instance - - # [START howto_operator_k8s_private_image] - quay_k8s = KubernetesPodOperator( - namespace='default', - image='quay.io/apache/bash', - image_pull_secrets=[k8s.V1LocalObjectReference('testquay')], - cmds=["bash", "-cx"], - arguments=["echo", "10", "echo pwd"], - labels={"foo": "bar"}, - name="airflow-private-image-pod", - is_delete_operator_pod=True, - in_cluster=True, - task_id="task-two", - get_logs=True, - ) - # [END howto_operator_k8s_private_image] - - # [START howto_operator_k8s_write_xcom] - write_xcom = KubernetesPodOperator( - namespace='default', - image='alpine', - cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"], - name="write-xcom", - do_xcom_push=True, - is_delete_operator_pod=True, - in_cluster=True, - task_id="write-xcom", - get_logs=True, - ) - - pod_task_xcom_result = BashOperator( - bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"", - task_id="pod_task_xcom_result", - ) - # [END howto_operator_k8s_write_xcom] - - # write_xcom >> pod_task_xcom_result diff --git a/airflow/example_dags/example_kubernetes_decorator.py b/airflow/example_dags/example_kubernetes_decorator.py new file mode 100644 index 0000000000000..7a407b7d62ba6 --- /dev/null +++ b/airflow/example_dags/example_kubernetes_decorator.py @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +This is an example dag for using the KubernetesPodOperator. +""" + +from datetime import datetime + +from airflow import DAG +from airflow.decorators import dag, task + +with DAG( + dag_id='example_kubernetes_decorator', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=['example'], +) as dag: + + @task.kubernetes(image='python:3.9-slim-buster', name='k8s_test', namespace='default') + def execute_in_k8s_pod(): + import time + + print("Hello from k8s pod") + time.sleep(100) + + @task.kubernetes() + def print_pattern(): + line = "Kubernetes Decorator" + pat = "" + for i in range(0, line): + for j in range(0, line): + if ( + (j == 1 and i != 0 and i != line - 1) + or ((i == 0 or i == line - 1) and j > 1 and j < line - 2) + or (i == ((line - 1) / 2) and j > line - 5 and j < line - 1) + or (j == line - 2 and i != 0 and i != line - 1 and i >= ((line - 1) / 2)) + ): + pat = pat + "*" + else: + pat = pat + " " + pat = pat + "\n" + return pat + + execute_in_k8s_pod_instance = execute_in_k8s_pod() + print_pattern_instance = print_pattern() + execute_in_k8s_pod_instance >> print_pattern_instance diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 059ad3e29cde5..8f101304cb8c9 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -15,15 +15,15 @@ # specific language governing permissions and limitations # under the License. -import base64 import inspect import os import pickle -from tempfile import TemporaryDirectory +import tempfile from textwrap import dedent from typing import TYPE_CHECKING, Callable, Optional, Sequence, TypeVar import dill +from kubernetes.client import models as k8s from airflow.decorators.base import DecoratedOperator, task_decorator_factory from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator @@ -37,14 +37,18 @@ def _generate_decode_command(env_var, file): # We don't need `f.close()` as the interpreter is about to exit anyway return ( f'python -c "import base64, os;' - rf'x = base64.b64decode(os.environ[\"{env_var}\"]);' - rf'f = open(\"{file}\", \"wb\"); f.write(x);"' + # rf'x = base64.b64decode(os.environ[\"{env_var}\"]);' + # r + rf'x = os.environ[\"{env_var}\"];' + # r + rf'f = open(\"{file}\", \"w\"); f.write(x);"' ) def _b64_encode_file(filename): - with open(filename, "rb") as file_to_encode: - return base64.b64encode(file_to_encode.read()) + with open(filename) as file_to_encode: + # return base64.b64encode(file_to_encode.read()) + return file_to_encode.read() class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator): @@ -79,46 +83,165 @@ def __init__( print("KUBERNETES EXECUTOR" + str(kwargs)) command = "dummy command" self.pickling_library = dill if use_dill else pickle + + # Image, name and namespace are all required. + if not 'image' in kwargs: + kwargs['image'] = 'python' + + if not 'name' in kwargs: + kwargs['name'] = 'test' + + if not 'namespace' in kwargs: + kwargs['namespace'] = 'default' + super().__init__(**kwargs) - def execute(self, context: 'Context'): - with TemporaryDirectory(prefix='venv') as tmp_dir: - input_filename = os.path.join(tmp_dir, 'script.in') - script_filename = os.path.join(tmp_dir, 'script.py') - - with open(input_filename, 'wb') as file: - if self.op_args or self.op_kwargs: - self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file) - py_source = self._get_python_source() - print("KUBERNETES DECORATOR" + py_source) - write_python_script( - jinja_context=dict( - op_args=self.op_args, - op_kwargs=self.op_kwargs, - pickling_library=self.pickling_library.__name__, - python_callable=self.python_callable.__name__, - python_callable_source=py_source, - string_args_global=False, - ), - filename=script_filename, - ) + def write_python_script( + jinja_context: dict, + filename: str, + render_template_as_native_obj: bool = False, + ): + """ + Renders the python script to a file to execute in the virtual environment. - # Pass the python script to be executed, and the input args, via environment variables - # and its injected to the pod as a config map - # self.environment["__PYTHON_SCRIPT"] = _b64_encode_file(script_filename) - # if self.op_args or self.op_kwargs: - # self.environment["__PYTHON_INPUT"] = _b64_encode_file(input_filename) - # else: - # self.environment["__PYTHON_INPUT"] = "" - - self.command = ( - f"""bash -cx '{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} &&""" - f'{_generate_decode_command("__PYTHON_INPUT", "/tmp/script.in")} &&' - f'python /tmp/script.py /tmp/script.in /tmp/script.out\'' + :param jinja_context: The jinja context variables to unpack and replace with its placeholders in the + template file. + :type jinja_context: dict + :param filename: The name of the file to dump the rendered script to. + :type filename: str + :param render_template_as_native_obj: If ``True``, rendered Jinja template would be converted + to a native Python object + """ + template_loader = jinja2.FileSystemLoader(searchpath=os.path.dirname(__file__)) + template_env: jinja2.Environment + if render_template_as_native_obj: + template_env = jinja2.nativetypes.NativeEnvironment( + loader=template_loader, undefined=jinja2.StrictUndefined ) + else: + template_env = jinja2.Environment(loader=template_loader, undefined=jinja2.StrictUndefined) + template = template_env.get_template('python_virtualenv_script.jinja2') + + template.stream(**jinja_context).dump(filename) + + def execute(self, context: 'Context'): + + tmp_dir = tempfile.mkdtemp(prefix='venv') + # tempfile.mkdtemp() + input_filename = os.path.join(tmp_dir, 'script.in') + script_filename = os.path.join(tmp_dir, 'script.py') + + with open(input_filename, 'wb') as file: + if self.op_args or self.op_kwargs: + self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file) + py_source = self._get_python_source() + print("KUBERNETES DECORATOR" + py_source) + + write_python_script( + jinja_context=dict( + op_args=self.op_args, + op_kwargs=self.op_kwargs, + pickling_library=self.pickling_library.__name__, + python_callable=self.python_callable.__name__, + python_callable_source=py_source, + string_args_global=False, + ), + filename=script_filename, + ) + + # Pass the python script to be executed, and the input args, via environment variables + # and its injected to the pod as a config map + # self.environment["__PYTHON_SCRIPT"] = _b64_encode_file(script_filename) + # if self.op_args or self.op_kwargs: + # self.environment["__PYTHON_INPUT"] = _b64_encode_file(input_filename) + # else: + # self.environment["__PYTHON_INPUT"] = "" + # + # self.command = ( + # f"""bash -cx '{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} &&""" + # f'{_generate_decode_command("__PYTHON_INPUT", "/tmp/script.in")} &&' + # f'python /tmp/script.py /tmp/script.in /tmp/script.out\'' + # ) + + # volumeMounts: + # - mountPath: / singlefile / hosts + # name: etc + # subPath: hosts + # + # + # volumes: + # - name: etc + # hostPath: + # path: / etc + + # directory_prefix = script_filename.rsplit('/',1)[0] + # + # #directory_prefix = '/tmp/venv8upj253q' + '/' + # file_name = script_filename.rsplit('/',1)[1] + # + # + # #script_filename='/tmp_node/venv8upj253q/script.py' + # self.volumes.append(k8s.V1Volume(name='script-2', host_path= + # k8s.V1HostPathVolumeSource(path=script_filename, type='File'))) + # self.volume_mounts.append(k8s.V1VolumeMount(mount_path=script_filename, name='script-2', read_only=True)) + # #sub_path=file_name)) + + # Pass the python script to be executed, and the input args, via environment variables + # and its injected to the pod as a config map + self.environment = {} + self.environment["__PYTHON_SCRIPT"] = _b64_encode_file(script_filename) + + print("ENVIRONMENT" + str(self.environment)) + # if self.op_args or self.op_kwargs: + # self.environment["__PYTHON_INPUT"] = _b64_encode_file(input_filename) + # else: + # self.environment["__PYTHON_INPUT"] = "" + + self.env_vars.append(k8s.V1EnvVar(name="__PYTHON_SCRIPT", value=_b64_encode_file(script_filename))) + + self.cmds.append("bash") + self.arguments.append("-cx") + argument_string = ( + f'{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} && python /tmp/script.py' + ) + print("ARGUMENT" + argument_string) + + self.arguments.append( + f'{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} && python /tmp/script.py && sleep 1000' + ) + + print("COMMAND" + str(self.cmds)) + + print("ARGUMENTS" + str(self.arguments)) + + # self.command = f"python {script_filename}" + # self.command = f"ls /tmp" + # self.command = "python" + + # self.cmds.append(self.command) + # self.arguments.append(script_filename) + # self.arguments.append(script_filename) + # self.arguments.append(script_filename) + # self.arguments.append("/tmp/script.in") + # self.arguments.append("/tmp/script.out") + + # volumes: + # - name: config - volume + # configMap: + # name: sherlock - config + + # cmap = k8s.V1ConfigMap() + # cmap.data = {} + # cmap.data['script'] = script_filename + # + # vol_source = k8s.V1ConfigMapVolumeSource(name='script') + # + # self.volumes.append(k8s.V1Volume(name='config-volume', config_map=vol_source)) + # + # self.env_from.append(k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='script'))) - print("KUBERNETES COMMAND" + self.command) - return super().execute(context) + # print("KUBERNETES COMMAND" + self.command) + return super().execute(context) def _get_python_source(self): raw_source = inspect.getsource(self.python_callable) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 0044146c7b7f0..1ae72fb31acb3 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -401,7 +401,6 @@ def extract_xcom(self, pod): return json.loads(result) def execute(self, context: 'Context'): - print("KUBERNETES POD OPERATOR" + str(context)) remote_pod = None try: self.pod_request_obj = self.build_pod_request_obj(context) diff --git a/airflow/utils/python_kubernetes_script.jinja2 b/airflow/utils/python_kubernetes_script.jinja2 new file mode 100644 index 0000000000000..c961f10de4e5c --- /dev/null +++ b/airflow/utils/python_kubernetes_script.jinja2 @@ -0,0 +1,44 @@ +{# + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +-#} + +import {{ pickling_library }} +import sys + +{# Check whether Airflow is available in the environment. + # If it is, we'll want to ensure that we integrate any macros that are being provided + # by plugins prior to unpickling the task context. #} +if sys.version_info >= (3,6): + try: + from airflow.plugins_manager import integrate_macros_plugins + integrate_macros_plugins() + except ImportError: + {# Airflow is not available in this environment, therefore we won't + # be able to integrate any plugin macros. #} + pass + +{% if op_args or op_kwargs %} +with open(sys.argv[1], "rb") as file: + arg_dict = {{ pickling_library }}.load(file) +{% else %} +arg_dict = {"args": [], "kwargs": {}} +{% endif %} + +# Script +{{ python_callable_source }} +res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"]) From 1e2ae1178c34e248388d72e78f7babc2476ef8e8 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 23 Jan 2022 11:00:03 -0500 Subject: [PATCH 08/27] Refactoring of Kubernetes decorator --- airflow/decorators/__init__.pyi | 6 +- .../cncf/kubernetes/decorators/kubernetes.py | 155 ++++-------------- 2 files changed, 34 insertions(+), 127 deletions(-) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index db0884c487917..1cd28324447ae 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -145,9 +145,9 @@ class TaskDecoratorFactory: def kubernetes( python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs ) -> TaskDecorator: - """ - Python operator decorator. Wraps a function into an Airflow operator. - Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. + """Wraps a function to be executed on a k8s pod using KubernetesPodOperator + + Also accepts any argument that KubernetesPodOperator will via ``kwargs``. :param python_callable: Function to decorate :type python_callable: Optional[Callable] diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 8f101304cb8c9..ef64bef278407 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -18,7 +18,6 @@ import inspect import os import pickle -import tempfile from textwrap import dedent from typing import TYPE_CHECKING, Callable, Optional, Sequence, TypeVar @@ -37,23 +36,19 @@ def _generate_decode_command(env_var, file): # We don't need `f.close()` as the interpreter is about to exit anyway return ( f'python -c "import base64, os;' - # rf'x = base64.b64decode(os.environ[\"{env_var}\"]);' - # r rf'x = os.environ[\"{env_var}\"];' - # r rf'f = open(\"{file}\", \"w\"); f.write(x);"' ) def _b64_encode_file(filename): with open(filename) as file_to_encode: - # return base64.b64encode(file_to_encode.read()) return file_to_encode.read() class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator): """ - Wraps a Python callable and captures args/kwargs when called for execution. + Wraps a Python callable and executes in a kubernetes pod :param python_callable: A reference to an object that is callable :type python_callable: python callable @@ -120,128 +115,40 @@ def write_python_script( ) else: template_env = jinja2.Environment(loader=template_loader, undefined=jinja2.StrictUndefined) - template = template_env.get_template('python_virtualenv_script.jinja2') + template = template_env.get_template('python_kubernetes_script.jinja2') template.stream(**jinja_context).dump(filename) def execute(self, context: 'Context'): - tmp_dir = tempfile.mkdtemp(prefix='venv') - # tempfile.mkdtemp() - input_filename = os.path.join(tmp_dir, 'script.in') - script_filename = os.path.join(tmp_dir, 'script.py') - - with open(input_filename, 'wb') as file: - if self.op_args or self.op_kwargs: - self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file) - py_source = self._get_python_source() - print("KUBERNETES DECORATOR" + py_source) - - write_python_script( - jinja_context=dict( - op_args=self.op_args, - op_kwargs=self.op_kwargs, - pickling_library=self.pickling_library.__name__, - python_callable=self.python_callable.__name__, - python_callable_source=py_source, - string_args_global=False, - ), - filename=script_filename, - ) - - # Pass the python script to be executed, and the input args, via environment variables - # and its injected to the pod as a config map - # self.environment["__PYTHON_SCRIPT"] = _b64_encode_file(script_filename) - # if self.op_args or self.op_kwargs: - # self.environment["__PYTHON_INPUT"] = _b64_encode_file(input_filename) - # else: - # self.environment["__PYTHON_INPUT"] = "" - # - # self.command = ( - # f"""bash -cx '{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} &&""" - # f'{_generate_decode_command("__PYTHON_INPUT", "/tmp/script.in")} &&' - # f'python /tmp/script.py /tmp/script.in /tmp/script.out\'' - # ) - - # volumeMounts: - # - mountPath: / singlefile / hosts - # name: etc - # subPath: hosts - # - # - # volumes: - # - name: etc - # hostPath: - # path: / etc - - # directory_prefix = script_filename.rsplit('/',1)[0] - # - # #directory_prefix = '/tmp/venv8upj253q' + '/' - # file_name = script_filename.rsplit('/',1)[1] - # - # - # #script_filename='/tmp_node/venv8upj253q/script.py' - # self.volumes.append(k8s.V1Volume(name='script-2', host_path= - # k8s.V1HostPathVolumeSource(path=script_filename, type='File'))) - # self.volume_mounts.append(k8s.V1VolumeMount(mount_path=script_filename, name='script-2', read_only=True)) - # #sub_path=file_name)) - - # Pass the python script to be executed, and the input args, via environment variables - # and its injected to the pod as a config map - self.environment = {} - self.environment["__PYTHON_SCRIPT"] = _b64_encode_file(script_filename) - - print("ENVIRONMENT" + str(self.environment)) - # if self.op_args or self.op_kwargs: - # self.environment["__PYTHON_INPUT"] = _b64_encode_file(input_filename) - # else: - # self.environment["__PYTHON_INPUT"] = "" - - self.env_vars.append(k8s.V1EnvVar(name="__PYTHON_SCRIPT", value=_b64_encode_file(script_filename))) - - self.cmds.append("bash") - self.arguments.append("-cx") - argument_string = ( - f'{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} && python /tmp/script.py' - ) - print("ARGUMENT" + argument_string) - - self.arguments.append( - f'{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} && python /tmp/script.py && sleep 1000' - ) - - print("COMMAND" + str(self.cmds)) - - print("ARGUMENTS" + str(self.arguments)) - - # self.command = f"python {script_filename}" - # self.command = f"ls /tmp" - # self.command = "python" - - # self.cmds.append(self.command) - # self.arguments.append(script_filename) - # self.arguments.append(script_filename) - # self.arguments.append(script_filename) - # self.arguments.append("/tmp/script.in") - # self.arguments.append("/tmp/script.out") - - # volumes: - # - name: config - volume - # configMap: - # name: sherlock - config - - # cmap = k8s.V1ConfigMap() - # cmap.data = {} - # cmap.data['script'] = script_filename - # - # vol_source = k8s.V1ConfigMapVolumeSource(name='script') - # - # self.volumes.append(k8s.V1Volume(name='config-volume', config_map=vol_source)) - # - # self.env_from.append(k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name='script'))) - - # print("KUBERNETES COMMAND" + self.command) - return super().execute(context) + with TemporaryDirectory(prefix='venv') as tmp_dir: + script_filename = os.path.join(tmp_dir, 'script.py') + py_source = self._get_python_source() + + write_python_script( + jinja_context=dict( + op_args=self.op_args, + op_kwargs=self.op_kwargs, + pickling_library=self.pickling_library.__name__, + python_callable=self.python_callable.__name__, + python_callable_source=py_source, + string_args_global=False, + ), + filename=script_filename, + ) + + self.env_vars.append( + k8s.V1EnvVar(name="__PYTHON_SCRIPT", value=_b64_encode_file(script_filename)) + ) + + self.cmds.append("bash") + self.arguments.append("-cx") + + self.arguments.append( + f'{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} && python /tmp/script.py && sleep 1000' + ) + + return super().execute(context) def _get_python_source(self): raw_source = inspect.getsource(self.python_callable) @@ -257,7 +164,7 @@ def kubernetes_task( python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs ): """ - Python operator decorator. Wraps a function into an Airflow operator. + Kubernetes operator decorator. Wraps a function to be executed in K8s using KubernetesPodOperator. Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. :param python_callable: Function to decorate From 33d6fff121737846814096a31c3d8cee252746bd Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 24 Jan 2022 13:59:14 -0500 Subject: [PATCH 09/27] Clean up and added test case --- .../example_kubernetes_decorator.py | 30 ++++------ .../cncf/kubernetes/decorators/kubernetes.py | 59 +++++-------------- airflow/utils/python_virtualenv.py | 3 +- .../cncf/kubernetes/decorators/__init__.py | 0 .../kubernetes/decorators/test_kubernetes.py | 51 ++++++++++++++++ 5 files changed, 82 insertions(+), 61 deletions(-) create mode 100644 tests/providers/cncf/kubernetes/decorators/__init__.py create mode 100644 tests/providers/cncf/kubernetes/decorators/test_kubernetes.py diff --git a/airflow/example_dags/example_kubernetes_decorator.py b/airflow/example_dags/example_kubernetes_decorator.py index 7a407b7d62ba6..24d6030c2bf2e 100644 --- a/airflow/example_dags/example_kubernetes_decorator.py +++ b/airflow/example_dags/example_kubernetes_decorator.py @@ -31,30 +31,26 @@ tags=['example'], ) as dag: - @task.kubernetes(image='python:3.9-slim-buster', name='k8s_test', namespace='default') + @task.kubernetes(image='python:3.8-slim-buster', name='k8s_test', namespace='default') def execute_in_k8s_pod(): import time print("Hello from k8s pod") - time.sleep(100) + time.sleep(2) @task.kubernetes() def print_pattern(): - line = "Kubernetes Decorator" - pat = "" - for i in range(0, line): - for j in range(0, line): - if ( - (j == 1 and i != 0 and i != line - 1) - or ((i == 0 or i == line - 1) and j > 1 and j < line - 2) - or (i == ((line - 1) / 2) and j > line - 5 and j < line - 1) - or (j == line - 2 and i != 0 and i != line - 1 and i >= ((line - 1) / 2)) - ): - pat = pat + "*" - else: - pat = pat + " " - pat = pat + "\n" - return pat + n = 5 + for i in range(0, n): + # inner loop to handle number of columns + # values changing acc. to outer loop + for j in range(0, i + 1): + # printing stars + print("* ", end="") + + # ending line after each row + print("\r") + execute_in_k8s_pod_instance = execute_in_k8s_pod() print_pattern_instance = print_pattern() diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index ef64bef278407..f86c08aa8a1f2 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -18,6 +18,8 @@ import inspect import os import pickle +import uuid +from tempfile import TemporaryDirectory from textwrap import dedent from typing import TYPE_CHECKING, Callable, Optional, Sequence, TypeVar @@ -41,9 +43,9 @@ def _generate_decode_command(env_var, file): ) -def _b64_encode_file(filename): - with open(filename) as file_to_encode: - return file_to_encode.read() +def _read_file_contents(filename): + with open(filename) as script_file: + return script_file.read() class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator): @@ -72,80 +74,51 @@ class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator): def __init__( self, - use_dill=False, **kwargs, ) -> None: - print("KUBERNETES EXECUTOR" + str(kwargs)) - command = "dummy command" - self.pickling_library = dill if use_dill else pickle + self.pickling_library = pickle # Image, name and namespace are all required. if not 'image' in kwargs: - kwargs['image'] = 'python' + kwargs['image'] = 'python:3.8-slim-buster' if not 'name' in kwargs: - kwargs['name'] = 'test' + kwargs['name'] = f'k8s_airflow_pod_{uuid.uuid4().hex}' if not 'namespace' in kwargs: kwargs['namespace'] = 'default' super().__init__(**kwargs) - def write_python_script( - jinja_context: dict, - filename: str, - render_template_as_native_obj: bool = False, - ): - """ - Renders the python script to a file to execute in the virtual environment. - - :param jinja_context: The jinja context variables to unpack and replace with its placeholders in the - template file. - :type jinja_context: dict - :param filename: The name of the file to dump the rendered script to. - :type filename: str - :param render_template_as_native_obj: If ``True``, rendered Jinja template would be converted - to a native Python object - """ - template_loader = jinja2.FileSystemLoader(searchpath=os.path.dirname(__file__)) - template_env: jinja2.Environment - if render_template_as_native_obj: - template_env = jinja2.nativetypes.NativeEnvironment( - loader=template_loader, undefined=jinja2.StrictUndefined - ) - else: - template_env = jinja2.Environment(loader=template_loader, undefined=jinja2.StrictUndefined) - template = template_env.get_template('python_kubernetes_script.jinja2') - - template.stream(**jinja_context).dump(filename) - def execute(self, context: 'Context'): with TemporaryDirectory(prefix='venv') as tmp_dir: script_filename = os.path.join(tmp_dir, 'script.py') py_source = self._get_python_source() - write_python_script( - jinja_context=dict( + jinja_context = dict( op_args=self.op_args, op_kwargs=self.op_kwargs, pickling_library=self.pickling_library.__name__, python_callable=self.python_callable.__name__, python_callable_source=py_source, string_args_global=False, - ), + ) + write_python_script( + jinja_context=jinja_context, filename=script_filename, + template_file='python_kubernetes_script.jinja2' ) self.env_vars.append( - k8s.V1EnvVar(name="__PYTHON_SCRIPT", value=_b64_encode_file(script_filename)) + k8s.V1EnvVar(name="__PYTHON_SCRIPT", value=_read_file_contents(script_filename)) ) self.cmds.append("bash") - self.arguments.append("-cx") + self.arguments.append("-cx") self.arguments.append( - f'{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} && python /tmp/script.py && sleep 1000' + f'{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} && python /tmp/script.py' ) return super().execute(context) diff --git a/airflow/utils/python_virtualenv.py b/airflow/utils/python_virtualenv.py index 6b7265456a321..7676406104304 100644 --- a/airflow/utils/python_virtualenv.py +++ b/airflow/utils/python_virtualenv.py @@ -121,6 +121,7 @@ def write_python_script( jinja_context: dict, filename: str, render_template_as_native_obj: bool = False, + template_file: str = 'python_virtualenv_script.jinja2', ): """ Renders the python script to a file to execute in the virtual environment. @@ -141,5 +142,5 @@ def write_python_script( ) else: template_env = jinja2.Environment(loader=template_loader, undefined=jinja2.StrictUndefined) - template = template_env.get_template('python_virtualenv_script.jinja2') + template = template_env.get_template(template_file) template.stream(**jinja_context).dump(filename) diff --git a/tests/providers/cncf/kubernetes/decorators/__init__.py b/tests/providers/cncf/kubernetes/decorators/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py new file mode 100644 index 0000000000000..dcced7e182851 --- /dev/null +++ b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from unittest import mock +from unittest.mock import MagicMock + +import pytest +from airflow.decorators import task +from airflow.models.dag import DAG +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2021, 9, 1) + + +class TestKubernetesDecorator: + @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.execute") + @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.__init__") + def test_basic_kubernetes_operator(self, execute_mock, init_mock, dag_maker): + + @task + def dummy_f(): + pass + + @task.kubernetes(task_id="kubernetes_operator") + def f(): + import random + return [random.random() for _ in range(100)] + + with dag_maker(): + df = dummy_f() + ret = f() + df.set_downstream(ret) + + dr = dag_maker.create_dagrun() + ret.operator.run(start_date=dr.execution_date, end_date=dr.execution_date) + ti = dr.get_task_instances()[0] + assert len(ti.xcom_pull()) == 100 From 7fd13d7264f67af320f1fa9ad8852a4847e9970d Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 24 Jan 2022 15:35:55 -0500 Subject: [PATCH 10/27] Clean up and added test case --- .../example_kubernetes_decorator.py | 1 - .../cncf/kubernetes/decorators/kubernetes.py | 17 ++++++++--------- .../cncf/kubernetes/decorators/__init__.py | 16 ++++++++++++++++ .../kubernetes/decorators/test_kubernetes.py | 6 ++---- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/airflow/example_dags/example_kubernetes_decorator.py b/airflow/example_dags/example_kubernetes_decorator.py index 24d6030c2bf2e..24574c4a2c03e 100644 --- a/airflow/example_dags/example_kubernetes_decorator.py +++ b/airflow/example_dags/example_kubernetes_decorator.py @@ -51,7 +51,6 @@ def print_pattern(): # ending line after each row print("\r") - execute_in_k8s_pod_instance = execute_in_k8s_pod() print_pattern_instance = print_pattern() execute_in_k8s_pod_instance >> print_pattern_instance diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index f86c08aa8a1f2..029f26974f96f 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -23,7 +23,6 @@ from textwrap import dedent from typing import TYPE_CHECKING, Callable, Optional, Sequence, TypeVar -import dill from kubernetes.client import models as k8s from airflow.decorators.base import DecoratedOperator, task_decorator_factory @@ -97,17 +96,17 @@ def execute(self, context: 'Context'): py_source = self._get_python_source() jinja_context = dict( - op_args=self.op_args, - op_kwargs=self.op_kwargs, - pickling_library=self.pickling_library.__name__, - python_callable=self.python_callable.__name__, - python_callable_source=py_source, - string_args_global=False, - ) + op_args=self.op_args, + op_kwargs=self.op_kwargs, + pickling_library=self.pickling_library.__name__, + python_callable=self.python_callable.__name__, + python_callable_source=py_source, + string_args_global=False, + ) write_python_script( jinja_context=jinja_context, filename=script_filename, - template_file='python_kubernetes_script.jinja2' + template_file='python_kubernetes_script.jinja2', ) self.env_vars.append( diff --git a/tests/providers/cncf/kubernetes/decorators/__init__.py b/tests/providers/cncf/kubernetes/decorators/__init__.py index e69de29bb2d1d..13a83393a9124 100644 --- a/tests/providers/cncf/kubernetes/decorators/__init__.py +++ b/tests/providers/cncf/kubernetes/decorators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py index dcced7e182851..11666e2a89513 100644 --- a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py +++ b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py @@ -16,11 +16,9 @@ # under the License. from unittest import mock -from unittest.mock import MagicMock -import pytest + from airflow.decorators import task -from airflow.models.dag import DAG from airflow.utils import timezone DEFAULT_DATE = timezone.datetime(2021, 9, 1) @@ -30,7 +28,6 @@ class TestKubernetesDecorator: @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.execute") @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.__init__") def test_basic_kubernetes_operator(self, execute_mock, init_mock, dag_maker): - @task def dummy_f(): pass @@ -38,6 +35,7 @@ def dummy_f(): @task.kubernetes(task_id="kubernetes_operator") def f(): import random + return [random.random() for _ in range(100)] with dag_maker(): From f1c89a1359a37635355a50c077147727a0c3a887 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 24 Jan 2022 18:10:19 -0500 Subject: [PATCH 11/27] Added test case for kubernetes decorator --- .../cncf/kubernetes/decorators/kubernetes.py | 10 ++++- .../kubernetes/decorators/test_kubernetes.py | 45 +++++++++++++------ 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 029f26974f96f..70142045b0abf 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -25,7 +25,7 @@ from kubernetes.client import models as k8s -from airflow.decorators.base import DecoratedOperator, task_decorator_factory +from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script @@ -87,6 +87,12 @@ def __init__( if not 'namespace' in kwargs: kwargs['namespace'] = 'default' + kwargs_to_upstream = { + "python_callable": kwargs["python_callable"], + "op_args": kwargs["op_args"], + "op_kwargs": kwargs["op_kwargs"], + } + super().__init__(**kwargs) def execute(self, context: 'Context'): @@ -134,7 +140,7 @@ def _get_python_source(self): def kubernetes_task( python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs -): +) -> TaskDecorator: """ Kubernetes operator decorator. Wraps a function to be executed in K8s using KubernetesPodOperator. Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. diff --git a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py index 11666e2a89513..cde31243cb92d 100644 --- a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py +++ b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py @@ -17,7 +17,6 @@ from unittest import mock - from airflow.decorators import task from airflow.utils import timezone @@ -26,24 +25,42 @@ class TestKubernetesDecorator: @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.execute") - @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.__init__") - def test_basic_kubernetes_operator(self, execute_mock, init_mock, dag_maker): - @task - def dummy_f(): - pass + # @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.__init__") + def test_task_creation_with_params(self, execute_mock, dag_maker): + @task.kubernetes(image='python:3.8-slim-buster', name='k8s_test', namespace='default') + def k8s_decorator_func(): + print("decorator func") + + with dag_maker(): + ret = k8s_decorator_func() + ret + + dr = dag_maker.create_dagrun() + ret.operator.run(start_date=dr.execution_date, end_date=dr.execution_date) - @task.kubernetes(task_id="kubernetes_operator") - def f(): - import random + ti = dr.get_task_instances()[0] + assert ti.task_id == 'k8s_decorator_func' + assert ti.state == 'success' - return [random.random() for _ in range(100)] + @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.execute") + # @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.__init__") + def test_task_creation_default_params(self, execute_mock, dag_maker): + @task.kubernetes() + def k8s_decorator_func(): + print("decorator func") with dag_maker(): - df = dummy_f() - ret = f() - df.set_downstream(ret) + ret = k8s_decorator_func() + ret dr = dag_maker.create_dagrun() ret.operator.run(start_date=dr.execution_date, end_date=dr.execution_date) + ti = dr.get_task_instances()[0] - assert len(ti.xcom_pull()) == 100 + assert ti.task_id == 'k8s_decorator_func' + assert ti.state == 'success' + + # Default pod parameters + assert ret.operator.cmds[0] == 'bash' + assert ret.operator.arguments[0] == '-cx' + assert ret.operator.env_vars[0].name == '__PYTHON_SCRIPT' From 084c7128e537b5ba15622540ced6eeb7f109162a Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Fri, 28 Jan 2022 13:42:38 -0500 Subject: [PATCH 12/27] Removed unused code --- tests/providers/cncf/kubernetes/decorators/test_kubernetes.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py index cde31243cb92d..e502f5e934e53 100644 --- a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py +++ b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py @@ -25,7 +25,6 @@ class TestKubernetesDecorator: @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.execute") - # @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.__init__") def test_task_creation_with_params(self, execute_mock, dag_maker): @task.kubernetes(image='python:3.8-slim-buster', name='k8s_test', namespace='default') def k8s_decorator_func(): @@ -43,7 +42,6 @@ def k8s_decorator_func(): assert ti.state == 'success' @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.execute") - # @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.__init__") def test_task_creation_default_params(self, execute_mock, dag_maker): @task.kubernetes() def k8s_decorator_func(): From cd5911c2d0792fdfdbb65cf35a0bb2c53c582ef8 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 31 Jan 2022 19:44:27 -0500 Subject: [PATCH 13/27] Fix flake8, lint errors --- airflow/example_dags/example_kubernetes_decorator.py | 2 +- .../providers/cncf/kubernetes/decorators/kubernetes.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/example_dags/example_kubernetes_decorator.py b/airflow/example_dags/example_kubernetes_decorator.py index 24574c4a2c03e..1e1c0a1e590ce 100644 --- a/airflow/example_dags/example_kubernetes_decorator.py +++ b/airflow/example_dags/example_kubernetes_decorator.py @@ -22,7 +22,7 @@ from datetime import datetime from airflow import DAG -from airflow.decorators import dag, task +from airflow.decorators import task with DAG( dag_id='example_kubernetes_decorator', diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 70142045b0abf..bb453814e130d 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -78,13 +78,13 @@ def __init__( self.pickling_library = pickle # Image, name and namespace are all required. - if not 'image' in kwargs: + if 'image' not in kwargs: kwargs['image'] = 'python:3.8-slim-buster' - if not 'name' in kwargs: + if 'name' not in kwargs: kwargs['name'] = f'k8s_airflow_pod_{uuid.uuid4().hex}' - if not 'namespace' in kwargs: + if 'namespace' not in kwargs: kwargs['namespace'] = 'default' kwargs_to_upstream = { @@ -93,7 +93,7 @@ def __init__( "op_kwargs": kwargs["op_kwargs"], } - super().__init__(**kwargs) + super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs) def execute(self, context: 'Context'): From e5f32b8977cc66dfe041f8a7d4f0df0ae0bef322 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 31 Jan 2022 21:37:37 -0500 Subject: [PATCH 14/27] Changes to move kubernetes decorator logic out of airflow/decorators --- airflow/decorators/__init__.py | 3 --- airflow/decorators/__init__.pyi | 5 +---- .../providers/cncf/kubernetes/decorators/kubernetes.py | 8 +------- airflow/providers/cncf/kubernetes/provider.yaml | 4 ++++ 4 files changed, 6 insertions(+), 14 deletions(-) diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index 9f6a609cb899d..ac1295baa7874 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -22,7 +22,6 @@ from airflow.decorators.python_virtualenv import virtualenv_task from airflow.decorators.task_group import task_group from airflow.models.dag import dag -from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task from airflow.providers_manager import ProvidersManager # Please keep this in sync with the .pyi's __all__. @@ -34,7 +33,6 @@ "task_group", "python_task", "virtualenv_task", - "kubernetes_task", ] @@ -43,7 +41,6 @@ class TaskDecoratorCollection: python: Any = staticmethod(python_task) virtualenv = staticmethod(virtualenv_task) - kubernetes = staticmethod(kubernetes_task) __call__ = python # Alias '@task' to '@task.python'. diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index f71eaf32c790e..e177cf1988347 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -127,9 +127,8 @@ class TaskDecoratorCollection: """ @overload def virtualenv(self, python_callable: F) -> F: ... - @overload def kubernetes( - python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs + self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs ) -> TaskDecorator: """Wraps a function to be executed on a k8s pod using KubernetesPodOperator @@ -143,8 +142,6 @@ class TaskDecoratorCollection: Defaults to False. :type multiple_outputs: bool """ - @overload - def kubernetes(self, python_callable: F) -> F: ... # [START decorator_signature] def docker( self, diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index bb453814e130d..0db192fe0c696 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -87,13 +87,7 @@ def __init__( if 'namespace' not in kwargs: kwargs['namespace'] = 'default' - kwargs_to_upstream = { - "python_callable": kwargs["python_callable"], - "op_args": kwargs["op_args"], - "op_kwargs": kwargs["op_kwargs"], - } - - super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs) + super().__init__(**kwargs) def execute(self, context: 'Context'): diff --git a/airflow/providers/cncf/kubernetes/provider.yaml b/airflow/providers/cncf/kubernetes/provider.yaml index 6ace9cd5625dd..45429420346cb 100644 --- a/airflow/providers/cncf/kubernetes/provider.yaml +++ b/airflow/providers/cncf/kubernetes/provider.yaml @@ -73,3 +73,7 @@ hook-class-names: # deprecated - to be removed after providers add dependency o connection-types: - hook-class-name: airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook connection-type: kubernetes + +task-decorators: + - class-name: airflow.providers.cncf.kubernetes.decorators.kubernetes.kubernetes_task + name: kubernetes From eb8ccf102290aca4d05ec6b7df5f21945588972b Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 6 Feb 2022 15:35:12 -0500 Subject: [PATCH 15/27] Reverted back change in __init__.pyi --- airflow/decorators/__init__.pyi | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index e177cf1988347..59bc68eff080c 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -126,22 +126,7 @@ class TaskDecoratorCollection: such as transmission a large amount of XCom to TaskAPI. """ @overload - def virtualenv(self, python_callable: F) -> F: ... - def kubernetes( - self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs - ) -> TaskDecorator: - """Wraps a function to be executed on a k8s pod using KubernetesPodOperator - - Also accepts any argument that KubernetesPodOperator will via ``kwargs``. - - :param python_callable: Function to decorate - :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. - :type multiple_outputs: bool - """ + def virtualenv(self, python_callable: Function) -> Function: ... # [START decorator_signature] def docker( self, @@ -240,5 +225,20 @@ class TaskDecoratorCollection: :param cap_add: Include container capabilities """ # [END decorator_signature] + def kubernetes( + self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs + ) -> TaskDecorator: + """Wraps a function to be executed on a k8s pod using KubernetesPodOperator + + Also accepts any argument that KubernetesPodOperator will via ``kwargs``. + + :param python_callable: Function to decorate + :type python_callable: Optional[Callable] + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. List/Tuples will unroll to xcom values + with index as key. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + :type multiple_outputs: bool + """ task: TaskDecoratorCollection From 875be8ac339cf3292be27e884c71f821ab277991 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 6 Feb 2022 21:42:24 -0500 Subject: [PATCH 16/27] Added kubernetespodopertor parameter to kubernetes decorator --- airflow/decorators/__init__.pyi | 105 +++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index 59bc68eff080c..2e7564d7bec7a 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -226,7 +226,52 @@ class TaskDecoratorCollection: """ # [END decorator_signature] def kubernetes( - self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs + self, + python_callable: Optional[Callable] = None, + multiple_outputs: Optional[bool] = None, + namespace: Optional[str] = None, + image: Optional[str] = None, + name: Optional[str] = None, + random_name_suffix: Optional[bool] = True, + cmds: Optional[List[str]] = None, + arguments: Optional[List[str]] = None, + ports: Optional[List[k8s.V1ContainerPort]] = None, + volume_mounts: Optional[List[k8s.V1VolumeMount]] = None, + volumes: Optional[List[k8s.V1Volume]] = None, + env_vars: Optional[List[k8s.V1EnvVar]] = None, + env_from: Optional[List[k8s.V1EnvFromSource]] = None, + secrets: Optional[List[Secret]] = None, + in_cluster: Optional[bool] = None, + cluster_context: Optional[str] = None, + labels: Optional[Dict] = None, + reattach_on_restart: bool = True, + startup_timeout_seconds: int = 120, + get_logs: bool = True, + image_pull_policy: Optional[str] = None, + annotations: Optional[Dict] = None, + resources: Optional[k8s.V1ResourceRequirements] = None, + affinity: Optional[k8s.V1Affinity] = None, + config_file: Optional[str] = None, + node_selectors: Optional[dict] = None, + node_selector: Optional[dict] = None, + image_pull_secrets: Optional[List[k8s.V1LocalObjectReference]] = None, + service_account_name: Optional[str] = None, + is_delete_operator_pod: bool = True, + hostnetwork: bool = False, + tolerations: Optional[List[k8s.V1Toleration]] = None, + security_context: Optional[Dict] = None, + dnspolicy: Optional[str] = None, + schedulername: Optional[str] = None, + full_pod_spec: Optional[k8s.V1Pod] = None, + init_containers: Optional[List[k8s.V1Container]] = None, + log_events_on_failure: bool = False, + do_xcom_push: bool = False, + pod_template_file: Optional[str] = None, + priority_class_name: Optional[str] = None, + pod_runtime_info_envs: Optional[List[PodRuntimeInfoEnv]] = None, + termination_grace_period: Optional[int] = None, + configmaps: Optional[List[str]] = None, + **kwargs, ) -> TaskDecorator: """Wraps a function to be executed on a k8s pod using KubernetesPodOperator @@ -239,6 +284,64 @@ class TaskDecoratorCollection: with index as key. Dict will unroll to xcom values with keys as XCom keys. Defaults to False. :type multiple_outputs: bool + :param namespace: the namespace to run within kubernetes. + :param image: Docker image you wish to launch. Defaults to hub.docker.com, + but fully qualified URLS will point to custom repositories. (templated) + :param name: name of the pod in which the task will run, will be used (plus a random + suffix if random_name_suffix is True) to generate a pod id (DNS-1123 subdomain, + containing only [a-z0-9.-]). + :param random_name_suffix: if True, will generate a random suffix. + :param cmds: entrypoint of the container. (templated) + The docker images's entrypoint is used if this is not provided. + :param arguments: arguments of the entrypoint. (templated) + The docker image's CMD is used if this is not provided. + :param ports: ports for launched pod. + :param volume_mounts: volumeMounts for launched pod. + :param volumes: volumes for launched pod. Includes ConfigMaps and PersistentVolumes. + :param env_vars: Environment variables initialized in the container. (templated) + :param secrets: Kubernetes secrets to inject in the container. + They can be exposed as environment vars or files in a volume. + :param in_cluster: run kubernetes client with in_cluster configuration. + :param cluster_context: context that points to kubernetes cluster. + Ignored when in_cluster is True. If None, current-context is used. + :param reattach_on_restart: if the scheduler dies while the pod is running, reattach and monitor + :param labels: labels to apply to the Pod. (templated) + :param startup_timeout_seconds: timeout in seconds to startup the pod. + :param get_logs: get the stdout of the container as logs of the tasks. + :param image_pull_policy: Specify a policy to cache or always pull an image. + :param annotations: non-identifying metadata you can attach to the Pod. + Can be a large range of data, and can include characters + that are not permitted by labels. + :param resources: A dict containing resources requests and limits. + Possible keys are request_memory, request_cpu, limit_memory, limit_cpu, + and limit_gpu, which will be used to generate airflow.kubernetes.pod.Resources. + See also kubernetes.io/docs/concepts/configuration/manage-compute-resources-container + :param affinity: A dict containing a group of affinity scheduling rules. + :param config_file: The path to the Kubernetes config file. (templated) + If not specified, default value is ``~/.kube/config`` + :param node_selector: A dict containing a group of scheduling rules. + :param image_pull_secrets: Any image pull secrets to be given to the pod. + If more than one secret is required, provide a + comma separated list: secret_a,secret_b + :param service_account_name: Name of the service account + :param is_delete_operator_pod: What to do when the pod reaches its final + state, or the execution is interrupted. If True (default), delete the + pod; if False, leave the pod. + :param hostnetwork: If True enable host networking on the pod. + :param tolerations: A list of kubernetes tolerations. + :param security_context: security options the pod should run with (PodSecurityContext). + :param dnspolicy: dnspolicy for the pod. + :param schedulername: Specify a schedulername for the pod + :param full_pod_spec: The complete podSpec + :param init_containers: init container for the launched Pod + :param log_events_on_failure: Log the pod's events if a failure occurs + :param do_xcom_push: If True, the content of the file + /airflow/xcom/return.json in the container will also be pushed to an + XCom when the container completes. + :param pod_template_file: path to pod template file (templated) + :param priority_class_name: priority class name for the launched Pod + :param termination_grace_period: Termination grace period if task killed in UI, + defaults to kubernetes default """ task: TaskDecoratorCollection From d97ef0cb26b7695bb7e953ad9393b4461c6c54d2 Mon Sep 17 00:00:00 2001 From: subkanthi Date: Wed, 16 Feb 2022 11:27:48 -0500 Subject: [PATCH 17/27] Removed type definitions in kubernetes decorator --- airflow/providers/cncf/kubernetes/decorators/kubernetes.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 0db192fe0c696..98762a8303076 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -52,17 +52,13 @@ class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator): Wraps a Python callable and executes in a kubernetes pod :param python_callable: A reference to an object that is callable - :type python_callable: python callable :param op_kwargs: a dictionary of keyword arguments that will get unpacked in your function (templated) - :type op_kwargs: dict :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) - :type op_args: list :param multiple_outputs: if set, function return value will be unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. Defaults to False. - :type multiple_outputs: bool """ template_fields: Sequence[str] = ('op_args', 'op_kwargs') @@ -140,12 +136,10 @@ def kubernetes_task( Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. :param python_callable: Function to decorate - :type python_callable: Optional[Callable] :param multiple_outputs: if set, function return value will be unrolled to multiple XCom values. List/Tuples will unroll to xcom values with index as key. Dict will unroll to xcom values with keys as XCom keys. Defaults to False. - :type multiple_outputs: bool """ return task_decorator_factory( python_callable=python_callable, From 442e4ce74f4bdf07ba65f3e60e41ebdd87c49178 Mon Sep 17 00:00:00 2001 From: subkanthi Date: Mon, 28 Feb 2022 22:11:00 -0500 Subject: [PATCH 18/27] Cherry pick changes from 21881 --- airflow/decorators/__init__.pyi | 2 +- airflow/decorators/base.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index afc35ab434294..4d572929c03a0 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -20,7 +20,7 @@ # necessarily exist at run time. See "Creating Custom @task Decorators" # documentation for more details. -from typing import Any, Dict, Iterable, List, Mapping, Optional, Union, overload +from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union, overload from airflow.decorators.base import Function, Task, TaskDecorator from airflow.decorators.branch_python import branch_task diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py index d661b5727e2e8..155e7264e727c 100644 --- a/airflow/decorators/base.py +++ b/airflow/decorators/base.py @@ -476,7 +476,7 @@ def __call__(self, python_callable: Function) -> Task[Function]: def __call__( self, *, - multiple_outputs: Optional[bool], + multiple_outputs: Optional[bool] = None, **kwargs: Any, ) -> Callable[[Function], Task[Function]]: """For the decorator factory ``@task()`` case.""" From cf6653bf3a8713b80773916ea41d629c2f4ec285 Mon Sep 17 00:00:00 2001 From: Kanthi Date: Sun, 20 Mar 2022 15:45:46 -0400 Subject: [PATCH 19/27] Update airflow/decorators/__init__.pyi Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- airflow/decorators/__init__.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index 4d572929c03a0..ff9e64a11273a 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -250,7 +250,7 @@ class TaskDecoratorCollection: namespace: Optional[str] = None, image: Optional[str] = None, name: Optional[str] = None, - random_name_suffix: Optional[bool] = True, + random_name_suffix: bool = True, cmds: Optional[List[str]] = None, arguments: Optional[List[str]] = None, ports: Optional[List[k8s.V1ContainerPort]] = None, From a1f7ad107b112cd50a7b934b2f37fe0fc27797b4 Mon Sep 17 00:00:00 2001 From: Kanthi Date: Sun, 20 Mar 2022 15:46:05 -0400 Subject: [PATCH 20/27] Update airflow/decorators/__init__.pyi Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- airflow/decorators/__init__.pyi | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index ff9e64a11273a..7a19784896762 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -296,12 +296,9 @@ class TaskDecoratorCollection: Also accepts any argument that KubernetesPodOperator will via ``kwargs``. :param python_callable: Function to decorate - :type python_callable: Optional[Callable] :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. + unrolled to multiple XCom values. Dict will unroll to XCom values with keys as XCom keys. Defaults to False. - :type multiple_outputs: bool :param namespace: the namespace to run within kubernetes. :param image: Docker image you wish to launch. Defaults to hub.docker.com, but fully qualified URLS will point to custom repositories. (templated) From 775742a3e54f2d8fce346950a6e4f64bf3b9ab71 Mon Sep 17 00:00:00 2001 From: Kanthi Date: Sun, 20 Mar 2022 15:46:09 -0400 Subject: [PATCH 21/27] Update airflow/providers/cncf/kubernetes/decorators/kubernetes.py Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- airflow/providers/cncf/kubernetes/decorators/kubernetes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 98762a8303076..67c21c7863cca 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -137,8 +137,7 @@ def kubernetes_task( :param python_callable: Function to decorate :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. + unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys. Defaults to False. """ return task_decorator_factory( From 83dd4072735a092069a8b08a632b5a3e6714e483 Mon Sep 17 00:00:00 2001 From: Kanthi Date: Sun, 20 Mar 2022 15:46:17 -0400 Subject: [PATCH 22/27] Update airflow/example_dags/example_kubernetes_decorator.py Co-authored-by: Josh Fell <48934154+josh-fell@users.noreply.github.com> --- airflow/example_dags/example_kubernetes_decorator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/example_dags/example_kubernetes_decorator.py b/airflow/example_dags/example_kubernetes_decorator.py index 1e1c0a1e590ce..3f248d8695b76 100644 --- a/airflow/example_dags/example_kubernetes_decorator.py +++ b/airflow/example_dags/example_kubernetes_decorator.py @@ -29,6 +29,7 @@ schedule_interval=None, start_date=datetime(2021, 1, 1), tags=['example'], + catchup=False, ) as dag: @task.kubernetes(image='python:3.8-slim-buster', name='k8s_test', namespace='default') From 3ca339c4619e14a5253331ec201eb4052821c0de Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 21 Mar 2022 11:13:35 +0800 Subject: [PATCH 23/27] Make @task.kubernetes arguments keyword-only --- airflow/decorators/__init__.pyi | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index 6c6a0af3404a8..fe0fa7db5aa4c 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -253,6 +253,7 @@ class TaskDecoratorCollection: # [END decorator_signature] def kubernetes( self, + *, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, namespace: Optional[str] = None, From 65996b74b57b32a776e3581ef88e2b2992d47a19 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 10 Apr 2022 17:55:19 -0400 Subject: [PATCH 24/27] Throw exception if image is not passed to kubernetes decorator. --- airflow/providers/cncf/kubernetes/decorators/kubernetes.py | 3 ++- tests/providers/cncf/kubernetes/decorators/test_kubernetes.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 67c21c7863cca..8a54c05d3cd35 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -25,6 +25,7 @@ from kubernetes.client import models as k8s +from airflow import AirflowException from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script @@ -75,7 +76,7 @@ def __init__( # Image, name and namespace are all required. if 'image' not in kwargs: - kwargs['image'] = 'python:3.8-slim-buster' + raise AirflowException("Image required for kubernetes decorator") if 'name' not in kwargs: kwargs['name'] = f'k8s_airflow_pod_{uuid.uuid4().hex}' diff --git a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py index e502f5e934e53..9ee40911f6d79 100644 --- a/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py +++ b/tests/providers/cncf/kubernetes/decorators/test_kubernetes.py @@ -43,7 +43,7 @@ def k8s_decorator_func(): @mock.patch("airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator.execute") def test_task_creation_default_params(self, execute_mock, dag_maker): - @task.kubernetes() + @task.kubernetes(image='python:3.8-slim-buster') def k8s_decorator_func(): print("decorator func") From 6626060a10bd4a1ab69dac6e6d0c928dbc764e35 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Sun, 10 Apr 2022 22:12:52 -0400 Subject: [PATCH 25/27] Add image parameter to kubernetes decorator. --- airflow/example_dags/example_kubernetes_decorator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/example_dags/example_kubernetes_decorator.py b/airflow/example_dags/example_kubernetes_decorator.py index 3f248d8695b76..35480b7119d99 100644 --- a/airflow/example_dags/example_kubernetes_decorator.py +++ b/airflow/example_dags/example_kubernetes_decorator.py @@ -39,7 +39,7 @@ def execute_in_k8s_pod(): print("Hello from k8s pod") time.sleep(2) - @task.kubernetes() + @task.kubernetes(image='python:3.8-slim-buster') def print_pattern(): n = 5 for i in range(0, n): From 453e32ab030f84e6dc672248a8ca5163663f2db1 Mon Sep 17 00:00:00 2001 From: Kanthi Subramanian Date: Mon, 11 Apr 2022 09:22:43 -0400 Subject: [PATCH 26/27] Kubernetes decorator -set image parameter as required --- airflow/decorators/__init__.pyi | 7 ++----- airflow/providers/cncf/kubernetes/decorators/kubernetes.py | 6 +----- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index 17c1fba7ceee6..77cf04dc1a8a0 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -188,8 +188,7 @@ class TaskDecoratorCollection: Dict will unroll to XCom values with keys as XCom keys. Defaults to False. :param use_dill: Whether to use dill or pickle for serialization :param python_command: Python command for executing functions, Default: python3 - :param image: Docker image from which to create the container. - If image tag is omitted, "latest" will be used. + :param image: Docker image from which to create the container.(Required) :param api_version: Remote API version. Set to ``auto`` to automatically detect the server's version. :param container_name: Name of the container. Optional (templated) @@ -248,10 +247,9 @@ class TaskDecoratorCollection: def kubernetes( self, *, - python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, namespace: Optional[str] = None, - image: Optional[str] = None, + image: str, name: Optional[str] = None, random_name_suffix: bool = True, cmds: Optional[List[str]] = None, @@ -298,7 +296,6 @@ class TaskDecoratorCollection: Also accepts any argument that KubernetesPodOperator will via ``kwargs``. - :param python_callable: Function to decorate :param multiple_outputs: if set, function return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with keys as XCom keys. Defaults to False. diff --git a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py index 8a54c05d3cd35..32eeb90ee376d 100644 --- a/airflow/providers/cncf/kubernetes/decorators/kubernetes.py +++ b/airflow/providers/cncf/kubernetes/decorators/kubernetes.py @@ -25,7 +25,6 @@ from kubernetes.client import models as k8s -from airflow import AirflowException from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script @@ -74,10 +73,7 @@ def __init__( ) -> None: self.pickling_library = pickle - # Image, name and namespace are all required. - if 'image' not in kwargs: - raise AirflowException("Image required for kubernetes decorator") - + # Set defaults for name and namespace. if 'name' not in kwargs: kwargs['name'] = f'k8s_airflow_pod_{uuid.uuid4().hex}' From 3b12142774d76de06a87ca4ab6fe60c2df1adc7e Mon Sep 17 00:00:00 2001 From: Kanthi Date: Mon, 11 Apr 2022 21:28:19 -0400 Subject: [PATCH 27/27] Update airflow/decorators/__init__.pyi Co-authored-by: Tzu-ping Chung --- airflow/decorators/__init__.pyi | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index 77cf04dc1a8a0..21c62faab9d71 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -188,7 +188,7 @@ class TaskDecoratorCollection: Dict will unroll to XCom values with keys as XCom keys. Defaults to False. :param use_dill: Whether to use dill or pickle for serialization :param python_command: Python command for executing functions, Default: python3 - :param image: Docker image from which to create the container.(Required) + :param image: Docker image from which to create the container. (Required) :param api_version: Remote API version. Set to ``auto`` to automatically detect the server's version. :param container_name: Name of the container. Optional (templated)