From 0f5c17b161ca315498fb823491cf5d32a9730a85 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 12 May 2023 10:23:54 -0700 Subject: [PATCH 1/2] Make KPO tolerant of hooks that don't have xcom methods We probably should not be configuring xcom settings from the hook in this way but for better or worse we've done it, first in https://github.com/apache/airflow/pull/26766 and again in https://github.com/apache/airflow/pull/28125. The problem is that other operators derived from KPO, e.g. GKEStartPodOperator, may use a different hook that doesn't have this xcom-specific methods. So KPO needs to tolerate that. --- airflow/providers/cncf/kubernetes/operators/pod.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index dac7f1b542db3..43de55318fa98 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -861,10 +861,16 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod: pod = secret.attach_to_pod(pod) if self.do_xcom_push: self.log.debug("Adding xcom sidecar to task %s", self.task_id) + sidecar_container_image = None + sidecar_container_resources = None + if hasattr(self.hook, "get_xcom_sidecar_container_image"): + sidecar_container_image = self.hook.get_xcom_sidecar_container_image() + if hasattr(self.hook, "get_xcom_sidecar_container_resources"): + sidecar_container_resources = self.hook.get_xcom_sidecar_container_resources() pod = xcom_sidecar.add_xcom_sidecar( pod, - sidecar_container_image=self.hook.get_xcom_sidecar_container_image(), - sidecar_container_resources=self.hook.get_xcom_sidecar_container_resources(), + sidecar_container_image=sidecar_container_image, + sidecar_container_resources=sidecar_container_resources, ) labels = self._get_ti_pod_labels(context) From cf1ba2749ea57d88224d71a099fa9b68f7036ea0 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 12 May 2023 10:32:08 -0700 Subject: [PATCH 2/2] extract logic to method --- .../cncf/kubernetes/operators/pod.py | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 43de55318fa98..85616315ffd16 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -861,18 +861,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod: pod = secret.attach_to_pod(pod) if self.do_xcom_push: self.log.debug("Adding xcom sidecar to task %s", self.task_id) - sidecar_container_image = None - sidecar_container_resources = None - if hasattr(self.hook, "get_xcom_sidecar_container_image"): - sidecar_container_image = self.hook.get_xcom_sidecar_container_image() - if hasattr(self.hook, "get_xcom_sidecar_container_resources"): - sidecar_container_resources = self.hook.get_xcom_sidecar_container_resources() - pod = xcom_sidecar.add_xcom_sidecar( - pod, - sidecar_container_image=sidecar_container_image, - sidecar_container_resources=sidecar_container_resources, - ) - + pod = self._add_xcom_sidecar(pod) labels = self._get_ti_pod_labels(context) self.log.info("Building pod %s with labels: %s", pod.metadata.name, labels) @@ -889,6 +878,22 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod: pod_mutation_hook(pod) return pod + def _add_xcom_sidecar(self, pod): + """Add xcom sidecar to pod.""" + sidecar_container_image = None + sidecar_container_resources = None + # self.hook may not be subclass of KubernetesHook (see GKEStartPodOperator) so we must + # check to make sure these methods exist before calling + if hasattr(self.hook, "get_xcom_sidecar_container_image"): + sidecar_container_image = self.hook.get_xcom_sidecar_container_image() + if hasattr(self.hook, "get_xcom_sidecar_container_resources"): + sidecar_container_resources = self.hook.get_xcom_sidecar_container_resources() + return xcom_sidecar.add_xcom_sidecar( + pod, + sidecar_container_image=sidecar_container_image, + sidecar_container_resources=sidecar_container_resources, + ) + def dry_run(self) -> None: """ Prints out the pod definition that would be created by this operator.